Skip to content

Commit

Permalink
refactor: probes package in network topology (#2382)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed May 24, 2023
1 parent dec9948 commit b9c9b95
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 137 deletions.
28 changes: 16 additions & 12 deletions scheduler/networktopology/mocks/probes_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions scheduler/networktopology/network_topology.go
Expand Up @@ -52,16 +52,16 @@ type NetworkTopology interface {

// networkTopology is an implementation of network topology.
type networkTopology struct {
// Redis universal client interface.
// rdb is Redis universal client interface.
rdb redis.UniversalClient

// config is the network topology config.
config config.NetworkTopologyConfig

// Resource interface.
// resource is resource interface.
resource resource.Resource

// Storage interface.
// storage is storage interface.
storage storage.Storage
}

Expand Down
168 changes: 46 additions & 122 deletions scheduler/networktopology/probes.go
Expand Up @@ -21,8 +21,6 @@ package networktopology
import (
"context"
"encoding/json"
"errors"
"strconv"
"time"

"github.com/go-redis/redis/v8"
Expand All @@ -32,11 +30,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource"
)

const (
// DefaultMovingAverageWeight is the weight of the moving average.
DefaultMovingAverageWeight = 0.1
)

// Probe is the probe metadata.
type Probe struct {
// Host metadata.
Host *resource.Host `json:"host"`
Expand All @@ -57,40 +51,42 @@ func NewProbe(host *resource.Host, rtt time.Duration, createdAt time.Time) *Prob
}
}

// Probes is the interface to store probes.
type Probes interface {
// Peek returns the oldest probe without removing it.
Peek() (*Probe, bool)
Peek() (*Probe, error)

// Enqueue enqueues probe into the queue.
Enqueue(*Probe) error

// Dequeue removes and returns the oldest probe.
Dequeue() (*Probe, bool)
Dequeue() (*Probe, error)

// Length gets the length of probes.
Length() int64
Length() (int64, error)

// CreatedAt is the creation time of probes.
CreatedAt() time.Time
CreatedAt() (time.Time, error)

// UpdatedAt is the updated time to store probe.
UpdatedAt() time.Time
UpdatedAt() (time.Time, error)

// AverageRTT is the average round-trip time of probes.
AverageRTT() time.Duration
AverageRTT() (time.Duration, error)
}

// probes is the implementation of Probes.
type probes struct {
// config is the probe config.
config config.ProbeConfig

// Redis universal client interface.
// rdb is redis universal client interface.
rdb redis.UniversalClient

// src is the source host id.
// srcHostID is the source host id.
srcHostID string

// dest is the destination host id.
// destHostID is the destination host id.
destHostID string
}

Expand All @@ -105,145 +101,73 @@ func NewProbes(cfg config.ProbeConfig, rdb redis.UniversalClient, srcHostID stri
}

// Peek returns the oldest probe without removing it.
func (p *probes) Peek() (*Probe, bool) {
str, err := p.rdb.LIndex(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Result()
func (p *probes) Peek() (*Probe, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

rawProbe, err := p.rdb.LIndex(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Bytes()
if err != nil {
return nil, false
return nil, err
}

probe := &Probe{}
if err = json.Unmarshal([]byte(str), probe); err != nil {
return nil, false
if err = json.Unmarshal(rawProbe, probe); err != nil {
return nil, err
}

return probe, true
return probe, err
}

// TODO Implement function.
// Enqueue enqueues probe into the queue.
func (p *probes) Enqueue(probe *Probe) error {
length := p.Length()
if length == int64(p.config.QueueLength) {
if _, ok := p.Dequeue(); !ok {
return errors.New("remove the oldest probe error")
}
}

probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)
data, err := json.Marshal(probe)
if err != nil {
return err
}

if err = p.rdb.RPush(context.Background(), probesKey, data).Err(); err != nil {
return err
}

networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID)
if length == 0 {
if _, err := p.rdb.Pipelined(context.Background(), func(rdb redis.Pipeliner) error {
rdb.HSet(context.Background(), networkTopologyKey, "averageRTT", probe.RTT.Nanoseconds())
rdb.HSet(context.Background(), networkTopologyKey, "createdAt", probe.CreatedAt.UnixNano())
rdb.HSet(context.Background(), networkTopologyKey, "updatedAt", probe.CreatedAt.UnixNano())
return nil
}); err != nil {
return err
}

return nil
}
values, err := p.rdb.LRange(context.Background(), probesKey, 0, -1).Result()
if err != nil {
return err
}

var averageRTT time.Duration
for _, value := range values {
probe := &Probe{}
if err = json.Unmarshal([]byte(value), probe); err != nil {
return err
}

averageRTT = time.Duration(float64(averageRTT)*DefaultMovingAverageWeight +
float64(probe.RTT)*(1-DefaultMovingAverageWeight))
}

if _, err := p.rdb.Pipelined(context.Background(), func(rdb redis.Pipeliner) error {
rdb.HSet(context.Background(), networkTopologyKey, "averageRTT", averageRTT.Nanoseconds())
rdb.HSet(context.Background(), networkTopologyKey, "updatedAt", probe.CreatedAt.UnixNano())
return nil
}); err != nil {
return err
}

return nil
}

// Dequeue removes and returns the oldest probe.
func (p *probes) Dequeue() (*Probe, bool) {
str, err := p.rdb.LPop(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
func (p *probes) Dequeue() (*Probe, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

rawProbe, err := p.rdb.LPop(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Bytes()
if err != nil {
return nil, false
return nil, err
}

probe := &Probe{}
if err = json.Unmarshal([]byte(str), probe); err != nil {
return nil, false
if err = json.Unmarshal(rawProbe, probe); err != nil {
return nil, err
}

return probe, true
return probe, nil
}

// Length gets the length of probes.
func (p *probes) Length() int64 {
length, err := p.rdb.LLen(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
if err != nil {
return 0
}
func (p *probes) Length() (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

return length
return p.rdb.LLen(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result()
}

// CreatedAt is the creation time of probes.
func (p *probes) CreatedAt() time.Time {
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Result()
if err != nil {
return time.Time{}
}
func (p *probes) CreatedAt() (time.Time, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

nano, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return time.Time{}
}

return time.Unix(0, nano)
return p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Time()
}

// UpdatedAt is the updated time to store probe.
func (p *probes) UpdatedAt() time.Time {
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Result()
if err != nil {
return time.Time{}
}
func (p *probes) UpdatedAt() (time.Time, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

nano, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return time.Time{}
}

return time.Unix(0, nano)
return p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Time()
}

// TODO Implement function.
// AverageRTT is the average round-trip time of probes.
func (p *probes) AverageRTT() time.Duration {
value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT").Result()
if err != nil {
return time.Duration(0)
}

nano, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return time.Duration(0)
}

return time.Duration(nano)
func (p *probes) AverageRTT() (time.Duration, error) {
return time.Duration(0), nil
}

0 comments on commit b9c9b95

Please sign in to comment.