diff --git a/scheduler/networktopology/mocks/probes_mock.go b/scheduler/networktopology/mocks/probes_mock.go index 7cdbc5d18ea..f5f8d89e292 100644 --- a/scheduler/networktopology/mocks/probes_mock.go +++ b/scheduler/networktopology/mocks/probes_mock.go @@ -36,11 +36,12 @@ func (m *MockProbes) EXPECT() *MockProbesMockRecorder { } // AverageRTT mocks base method. -func (m *MockProbes) AverageRTT() time.Duration { +func (m *MockProbes) AverageRTT() (time.Duration, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AverageRTT") ret0, _ := ret[0].(time.Duration) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // AverageRTT indicates an expected call of AverageRTT. @@ -50,11 +51,12 @@ func (mr *MockProbesMockRecorder) AverageRTT() *gomock.Call { } // CreatedAt mocks base method. -func (m *MockProbes) CreatedAt() time.Time { +func (m *MockProbes) CreatedAt() (time.Time, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreatedAt") ret0, _ := ret[0].(time.Time) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // CreatedAt indicates an expected call of CreatedAt. @@ -64,11 +66,11 @@ func (mr *MockProbesMockRecorder) CreatedAt() *gomock.Call { } // Dequeue mocks base method. -func (m *MockProbes) Dequeue() (*networktopology.Probe, bool) { +func (m *MockProbes) Dequeue() (*networktopology.Probe, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Dequeue") ret0, _ := ret[0].(*networktopology.Probe) - ret1, _ := ret[1].(bool) + ret1, _ := ret[1].(error) return ret0, ret1 } @@ -93,11 +95,12 @@ func (mr *MockProbesMockRecorder) Enqueue(arg0 interface{}) *gomock.Call { } // Length mocks base method. -func (m *MockProbes) Length() int64 { +func (m *MockProbes) Length() (int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Length") ret0, _ := ret[0].(int64) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // Length indicates an expected call of Length. @@ -107,11 +110,11 @@ func (mr *MockProbesMockRecorder) Length() *gomock.Call { } // Peek mocks base method. -func (m *MockProbes) Peek() (*networktopology.Probe, bool) { +func (m *MockProbes) Peek() (*networktopology.Probe, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Peek") ret0, _ := ret[0].(*networktopology.Probe) - ret1, _ := ret[1].(bool) + ret1, _ := ret[1].(error) return ret0, ret1 } @@ -122,11 +125,12 @@ func (mr *MockProbesMockRecorder) Peek() *gomock.Call { } // UpdatedAt mocks base method. -func (m *MockProbes) UpdatedAt() time.Time { +func (m *MockProbes) UpdatedAt() (time.Time, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdatedAt") ret0, _ := ret[0].(time.Time) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // UpdatedAt indicates an expected call of UpdatedAt. diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index 790c9bb85de..6881d914ab4 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -52,16 +52,16 @@ type NetworkTopology interface { // networkTopology is an implementation of network topology. type networkTopology struct { - // Redis universal client interface. + // rdb is Redis universal client interface. rdb redis.UniversalClient // config is the network topology config. config config.NetworkTopologyConfig - // Resource interface. + // resource is resource interface. resource resource.Resource - // Storage interface. + // storage is storage interface. storage storage.Storage } diff --git a/scheduler/networktopology/probes.go b/scheduler/networktopology/probes.go index c87c8cb39d3..6e4de37dc98 100644 --- a/scheduler/networktopology/probes.go +++ b/scheduler/networktopology/probes.go @@ -21,8 +21,6 @@ package networktopology import ( "context" "encoding/json" - "errors" - "strconv" "time" "github.com/go-redis/redis/v8" @@ -32,11 +30,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource" ) -const ( - // DefaultMovingAverageWeight is the weight of the moving average. - DefaultMovingAverageWeight = 0.1 -) - +// Probe is the probe metadata. type Probe struct { // Host metadata. Host *resource.Host `json:"host"` @@ -57,40 +51,42 @@ func NewProbe(host *resource.Host, rtt time.Duration, createdAt time.Time) *Prob } } +// Probes is the interface to store probes. type Probes interface { // Peek returns the oldest probe without removing it. - Peek() (*Probe, bool) + Peek() (*Probe, error) // Enqueue enqueues probe into the queue. Enqueue(*Probe) error // Dequeue removes and returns the oldest probe. - Dequeue() (*Probe, bool) + Dequeue() (*Probe, error) // Length gets the length of probes. - Length() int64 + Length() (int64, error) // CreatedAt is the creation time of probes. - CreatedAt() time.Time + CreatedAt() (time.Time, error) // UpdatedAt is the updated time to store probe. - UpdatedAt() time.Time + UpdatedAt() (time.Time, error) // AverageRTT is the average round-trip time of probes. - AverageRTT() time.Duration + AverageRTT() (time.Duration, error) } +// probes is the implementation of Probes. type probes struct { // config is the probe config. config config.ProbeConfig - // Redis universal client interface. + // rdb is redis universal client interface. rdb redis.UniversalClient - // src is the source host id. + // srcHostID is the source host id. srcHostID string - // dest is the destination host id. + // destHostID is the destination host id. destHostID string } @@ -105,145 +101,73 @@ func NewProbes(cfg config.ProbeConfig, rdb redis.UniversalClient, srcHostID stri } // Peek returns the oldest probe without removing it. -func (p *probes) Peek() (*Probe, bool) { - str, err := p.rdb.LIndex(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Result() +func (p *probes) Peek() (*Probe, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + + rawProbe, err := p.rdb.LIndex(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID), 0).Bytes() if err != nil { - return nil, false + return nil, err } probe := &Probe{} - if err = json.Unmarshal([]byte(str), probe); err != nil { - return nil, false + if err = json.Unmarshal(rawProbe, probe); err != nil { + return nil, err } - return probe, true + return probe, err } +// TODO Implement function. // Enqueue enqueues probe into the queue. func (p *probes) Enqueue(probe *Probe) error { - length := p.Length() - if length == int64(p.config.QueueLength) { - if _, ok := p.Dequeue(); !ok { - return errors.New("remove the oldest probe error") - } - } - - probesKey := pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID) - data, err := json.Marshal(probe) - if err != nil { - return err - } - - if err = p.rdb.RPush(context.Background(), probesKey, data).Err(); err != nil { - return err - } - - networkTopologyKey := pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID) - if length == 0 { - if _, err := p.rdb.Pipelined(context.Background(), func(rdb redis.Pipeliner) error { - rdb.HSet(context.Background(), networkTopologyKey, "averageRTT", probe.RTT.Nanoseconds()) - rdb.HSet(context.Background(), networkTopologyKey, "createdAt", probe.CreatedAt.UnixNano()) - rdb.HSet(context.Background(), networkTopologyKey, "updatedAt", probe.CreatedAt.UnixNano()) - return nil - }); err != nil { - return err - } - - return nil - } - values, err := p.rdb.LRange(context.Background(), probesKey, 0, -1).Result() - if err != nil { - return err - } - - var averageRTT time.Duration - for _, value := range values { - probe := &Probe{} - if err = json.Unmarshal([]byte(value), probe); err != nil { - return err - } - - averageRTT = time.Duration(float64(averageRTT)*DefaultMovingAverageWeight + - float64(probe.RTT)*(1-DefaultMovingAverageWeight)) - } - - if _, err := p.rdb.Pipelined(context.Background(), func(rdb redis.Pipeliner) error { - rdb.HSet(context.Background(), networkTopologyKey, "averageRTT", averageRTT.Nanoseconds()) - rdb.HSet(context.Background(), networkTopologyKey, "updatedAt", probe.CreatedAt.UnixNano()) - return nil - }); err != nil { - return err - } - return nil } // Dequeue removes and returns the oldest probe. -func (p *probes) Dequeue() (*Probe, bool) { - str, err := p.rdb.LPop(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result() +func (p *probes) Dequeue() (*Probe, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + + rawProbe, err := p.rdb.LPop(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Bytes() if err != nil { - return nil, false + return nil, err } probe := &Probe{} - if err = json.Unmarshal([]byte(str), probe); err != nil { - return nil, false + if err = json.Unmarshal(rawProbe, probe); err != nil { + return nil, err } - return probe, true + return probe, nil } // Length gets the length of probes. -func (p *probes) Length() int64 { - length, err := p.rdb.LLen(context.Background(), pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result() - if err != nil { - return 0 - } +func (p *probes) Length() (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() - return length + return p.rdb.LLen(ctx, pkgredis.MakeProbesKeyInScheduler(p.srcHostID, p.destHostID)).Result() } // CreatedAt is the creation time of probes. -func (p *probes) CreatedAt() time.Time { - value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Result() - if err != nil { - return time.Time{} - } +func (p *probes) CreatedAt() (time.Time, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() - nano, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return time.Time{} - } - - return time.Unix(0, nano) + return p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "createdAt").Time() } // UpdatedAt is the updated time to store probe. -func (p *probes) UpdatedAt() time.Time { - value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Result() - if err != nil { - return time.Time{} - } +func (p *probes) UpdatedAt() (time.Time, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() - nano, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return time.Time{} - } - - return time.Unix(0, nano) + return p.rdb.HGet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt").Time() } +// TODO Implement function. // AverageRTT is the average round-trip time of probes. -func (p *probes) AverageRTT() time.Duration { - value, err := p.rdb.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT").Result() - if err != nil { - return time.Duration(0) - } - - nano, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return time.Duration(0) - } - - return time.Duration(nano) +func (p *probes) AverageRTT() (time.Duration, error) { + return time.Duration(0), nil }