Skip to content

Commit

Permalink
refactor: network topology package in scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed May 24, 2023
1 parent 1764fd7 commit 301d306
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 112 deletions.
53 changes: 27 additions & 26 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 39 additions & 65 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package networktopology

import (
"context"
"strconv"
"time"

"github.com/go-redis/redis/v8"

Expand All @@ -30,28 +30,33 @@ import (
"d7y.io/dragonfly/v2/scheduler/storage"
)

const (
// contextTimeout is the timeout of redis invoke.
contextTimeout = 2 * time.Minute
)

// NetworkTopology is an interface for network topology.
type NetworkTopology interface {
// ProbedCount is the number of times the host has been probed.
ProbedCount(hostID string) int64

// LoadDestHosts returns destination hosts for source host.
LoadDestHosts(hostID string) ([]string, bool)
// LoadDestHostIDs loads destination host ids by source host id.
LoadDestHostIDs(string) ([]string, error)

// DeleteHost deletes host.
DeleteHost(hostID string) error
DeleteHost(string) error

// StoreProbe stores probe between two hosts.
StoreProbe(src, dest string, probe *Probe) bool
// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

// LoadProbes loads probes by source host id and destination host id.
LoadProbes(string, string) Probes
}

// networkTopology is an implementation of network topology.
type networkTopology struct {
// Redis universal client interface.
rdb redis.UniversalClient

// Scheduler config.
config *config.Config
// config is the network topology config.
config config.NetworkTopologyConfig

// Resource interface.
resource resource.Resource
Expand All @@ -61,7 +66,7 @@ type networkTopology struct {
}

// New network topology interface.
func NewNetworkTopology(cfg *config.Config, rdb redis.UniversalClient, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) {
func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalClient, resource resource.Resource, storage storage.Storage) (NetworkTopology, error) {
return &networkTopology{
config: cfg,
rdb: rdb,
Expand All @@ -70,79 +75,48 @@ func NewNetworkTopology(cfg *config.Config, rdb redis.UniversalClient, resource
}, nil
}

// ProbedCount is the number of times the host has been probed.
func (n *networkTopology) ProbedCount(hostID string) int64 {
value, err := n.rdb.Get(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(hostID)).Result()
if err != nil {
return 0
}

probedCount, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return 0
}

return probedCount
}

// LoadDestHosts returns destination hosts for source host.
func (n *networkTopology) LoadDestHosts(hostID string) ([]string, bool) {
key := pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")
keys, err := n.rdb.Keys(context.Background(), key).Result()
if err != nil {
return []string{}, false
}
// LoadDestHostIDs loads destination host ids by source host id.
func (nt *networkTopology) LoadDestHostIDs(hostID string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

destHosts := make([]string, 0)
for _, k := range keys {
destHosts = append(destHosts, k[len(key)-1:])
}

return destHosts, true
return nt.rdb.Keys(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")).Result()
}

// DeleteHost deletes host.
func (n *networkTopology) DeleteHost(hostID string) error {
// Delete network topology.
if err := n.rdb.Del(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")).Err(); err != nil {
func (nt *networkTopology) DeleteHost(hostID string) error {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

if err := nt.rdb.Del(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")).Err(); err != nil {
return err
}

// Delete probes sent by the host.
if err := n.rdb.Del(context.Background(), pkgredis.MakeProbesKeyInScheduler(hostID, "*")).Err(); err != nil {
if err := nt.rdb.Del(ctx, pkgredis.MakeProbesKeyInScheduler(hostID, "*")).Err(); err != nil {
return err
}

// Delete probes sent to the host, and return the number of probes deleted for updating probed count.
count, err := n.rdb.Del(context.Background(), pkgredis.MakeProbesKeyInScheduler("*", hostID)).Result()
count, err := nt.rdb.Del(ctx, pkgredis.MakeProbesKeyInScheduler("*", hostID)).Result()
if err != nil {
return err
}

// Decrease probed count of host.
if err = n.rdb.DecrBy(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(hostID), count).Err(); err != nil {
if err = nt.rdb.DecrBy(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID), count).Err(); err != nil {
return err
}

return nil
}

// StoreProbe stores probe between two hosts.
func (n *networkTopology) StoreProbe(src, dest string, probe *Probe) bool {
probes := NewProbes(n.rdb, n.config.NetworkTopology.Probe.QueueLength, src, dest)
if err := probes.Enqueue(probe); err != nil {
return false
}

// Update probe count.
if err := n.rdb.Incr(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(src)).Err(); err != nil {
return false
}
// ProbedCount is the number of times the host has been probed.
func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

// Update probed count.
if err := n.rdb.Incr(context.Background(), pkgredis.MakeProbedCountKeyInScheduler(dest)).Err(); err != nil {
return false
}
return nt.rdb.Get(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID)).Uint64()
}

return true
// LoadProbes loads probes by source host id and destination host id.
func (nt *networkTopology) LoadProbes(srcHostID, destHostID string) Probes {
return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID)
}
40 changes: 20 additions & 20 deletions scheduler/networktopology/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

"github.com/go-redis/redis/v8"

pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)

Expand Down Expand Up @@ -81,32 +81,32 @@ type Probes interface {
}

type probes struct {
// config is the probe config.
config config.ProbeConfig

// Redis universal client interface.
rdb redis.UniversalClient

// limit is the length limit of probe queue.
limit int

// src is the source host id.
src string
srcHostID string

// dest is the destination host id.
dest string
destHostID string
}

// NewProbes creates a probes interface.
func NewProbes(rdb redis.UniversalClient, limit int, src string, dest string) Probes {
func NewProbes(cfg config.ProbeConfig, rdb redis.UniversalClient, srcHostID string, destHostID string) Probes {
return &probes{
rdb: rdb,
limit: limit,
src: src,
dest: dest,
config: cfg,
rdb: rdb,
srcHostID: srcHostID,
destHostID: destHostID,
}
}

// Peek returns the oldest probe without removing it.
func (p *probes) Peek() (*Probe, bool) {
str, err := p.rdb.LIndex(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.src, p.dest), 0).Result()
str, err := p.rdb.LIndex(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Result()
if err != nil {
return nil, false
}
Expand All @@ -122,13 +122,13 @@ func (p *probes) Peek() (*Probe, bool) {
// Enqueue enqueues probe into the queue.
func (p *probes) Enqueue(probe *Probe) error {
length := p.Length()
if length == int64(p.limit) {
if length == int64(p.config.QueueLength) {
if _, ok := p.Dequeue(); !ok {
return errors.New("remove the oldest probe error")
}
}

probesKey := pkgredis.MakeProbesKeyInScheduler(p.src, p.dest)
probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)
data, err := json.Marshal(probe)
if err != nil {
return err
Expand All @@ -138,7 +138,7 @@ func (p *probes) Enqueue(probe *Probe) error {
return err
}

networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.src, p.dest)
networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID)
if length == 0 {
if _, err := p.rdb.Pipelined(context.Background(), func(rdb redis.Pipeliner) error {
rdb.HSet(context.Background(), networkTopologyKey, "averageRTT", probe.RTT.Nanoseconds())
Expand Down Expand Up @@ -180,7 +180,7 @@ func (p *probes) Enqueue(probe *Probe) error {

// Dequeue removes and returns the oldest probe.
func (p *probes) Dequeue() (*Probe, bool) {
str, err := p.rdb.LPop(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.src, p.dest)).Result()
str, err := p.rdb.LPop(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
if err != nil {
return nil, false
}
Expand All @@ -195,7 +195,7 @@ func (p *probes) Dequeue() (*Probe, bool) {

// Length gets the length of probes.
func (p *probes) Length() int64 {
length, err := p.rdb.LLen(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.src, p.dest)).Result()
length, err := p.rdb.LLen(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
if err != nil {
return 0
}
Expand All @@ -205,7 +205,7 @@ func (p *probes) Length() int64 {

// CreatedAt is the creation time of probes.
func (p *probes) CreatedAt() time.Time {
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.src, p.dest), "createdAt").Result()
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Result()
if err != nil {
return time.Time{}
}
Expand All @@ -220,7 +220,7 @@ func (p *probes) CreatedAt() time.Time {

// UpdatedAt is the updated time to store probe.
func (p *probes) UpdatedAt() time.Time {
value, err := p.rdb.HGet(context.Background(), fmt.Sprintf("network-topology:%s:%s", p.src, p.dest), "updatedAt").Result()
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Result()
if err != nil {
return time.Time{}
}
Expand All @@ -235,7 +235,7 @@ func (p *probes) UpdatedAt() time.Time {

// AverageRTT is the average round-trip time of probes.
func (p *probes) AverageRTT() time.Duration {
value, err := p.rdb.HGet(context.Background(), fmt.Sprintf("network-topology:%s:%s", p.src, p.dest), "averageRTT").Result()
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT").Result()
if err != nil {
return time.Duration(0)
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err

// Initialize network topology service.
if cfg.NetworkTopology.Enable {
s.networkTopology, err = networktopology.NewNetworkTopology(cfg, rdb, resource, s.storage)
s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 301d306

Please sign in to comment.