Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add scan function in redis and neighbours function in network topology #3048

Merged
merged 22 commits into from
Feb 2, 2024
Merged
26 changes: 26 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/fs"
"os"
"path/filepath"
"regexp"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -56,6 +57,7 @@ const (
)

type Cache interface {
Scan(m string, n int) ([]string, error)
Set(k string, x any, d time.Duration)
SetDefault(k string, x any)
Add(k string, x any, d time.Duration) error
Expand All @@ -82,6 +84,30 @@ type cache struct {
janitor *janitor
}

// Scan all items to get a specified number of matching regexp key.
func (c *cache) Scan(m string, n int) ([]string, error) {
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
c.mu.RLock()
defer c.mu.RUnlock()

regex, err := regexp.Compile(m)
if err != nil {
return nil, err
}

keys := make([]string, 0, n)
for item := range c.items {
if len(keys) >= n {
break
}

if regex.MatchString(item) {
keys = append(keys, item)
}
}

return keys, nil
}

// Add an item to the cache, replacing any existing item. If the duration is 0
// (DefaultExpiration), the cache's default expiration time is used. If it is -1
// (NoExpiration), the item never expires.
Expand Down
13 changes: 13 additions & 0 deletions pkg/cache/cache_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,61 @@ func TestStorePointerToStruct(t *testing.T) {
}
}

func TestScan(t *testing.T) {
tc := New(DefaultExpiration, 0)
tc.Set(v2, v1, DefaultExpiration)
tc.Set(v3, v1, DefaultExpiration)
keys, err := tc.Scan("^b", 1)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 1 {
t.Error("Invalid number of scaning cache keys")
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
}

keys, err = tc.Scan("^b", 2)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 2 {
t.Error("Invalid number of scaning cache keys")
}

keys, err = tc.Scan("^b", 4)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 2 {
t.Error("Invalid number of scaning cache keys")
}

keys, err = tc.Scan("^ba", 2)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 2 {
t.Error("Invalid number of scaning cache keys")
}

keys, err = tc.Scan("^a", 2)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 0 {
t.Error("Invalid number of scaning cache keys")
}

_, err = tc.Scan("(", 2)
if err == nil {
t.Error("Parse a fault regular expression")
}
}

func TestAdd(t *testing.T) {
tc := New(DefaultExpiration, 0)
err := tc.Add(v1, v2, DefaultExpiration)
Expand Down
15 changes: 15 additions & 0 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 68 additions & 7 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package networktopology
import (
"context"
"errors"
"math"
"sort"
"strconv"
"time"
Expand All @@ -47,6 +46,9 @@ const (

// findProbedCandidateHostsLimit is the limit of find probed candidate hosts.
findProbedCandidateHostsLimit = 50

// defaultScanCountLimit is the predefined amount of work performed with each 'Scan' operation called when retrieve elements from Redis.
defaultScanCountLimit = 64
)

// NetworkTopology is an interface for network topology.
Expand Down Expand Up @@ -76,6 +78,11 @@ type NetworkTopology interface {
// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

// Neighbours gets the specified number neighbors of source host for aggregation, by regexp scaning cache
// (if it is not enough for code to work, access redis to get neighbors), then parsing keys and loading host,
// while updating the cache data.
Neighbours(*resource.Host, int) ([]*resource.Host, error)

// Snapshot writes the current network topology to the storage.
Snapshot() error
}
Expand Down Expand Up @@ -255,25 +262,25 @@ func (nt *networkTopology) DeleteHost(hostID string) error {
defer cancel()

deleteKeys := []string{pkgredis.MakeProbedCountKeyInScheduler(hostID)}
srcNetworkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*"), math.MaxInt64).Result()
srcNetworkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*"), defaultScanCountLimit).Result()
if err != nil {
return err
}
deleteKeys = append(deleteKeys, srcNetworkTopologyKeys...)

destNetworkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", hostID), math.MaxInt64).Result()
destNetworkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler("*", hostID), defaultScanCountLimit).Result()
if err != nil {
return err
}
deleteKeys = append(deleteKeys, destNetworkTopologyKeys...)

srcProbesKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeProbesKeyInScheduler(hostID, "*"), math.MaxInt64).Result()
srcProbesKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeProbesKeyInScheduler(hostID, "*"), defaultScanCountLimit).Result()
if err != nil {
return err
}
deleteKeys = append(deleteKeys, srcProbesKeys...)

destProbesKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeProbesKeyInScheduler("*", hostID), math.MaxInt64).Result()
destProbesKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeProbesKeyInScheduler("*", hostID), defaultScanCountLimit).Result()
if err != nil {
return err
}
Expand Down Expand Up @@ -321,14 +328,68 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
return probedCount, nil
}

// Neighbours gets the specified number neighbors of source host for aggregation, by regexp scaning cache
// (if it is not enough for code to work, access redis to get neighbors), then parsing keys and loading host,
// while updating the cache data.
func (nt *networkTopology) Neighbours(srcHost *resource.Host, n int) ([]*resource.Host, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

networkTopologyKeys, err := nt.cache.Scan(pkgredis.MakeNetworkTopologyKeyInScheduler(srcHost.ID, "*"), n)
if err != nil {
return nil, err
}

// If it is not enough for code to work, access redis to get neighbors.
if len(networkTopologyKeys) < n {
networkTopologyKeys, _, err = nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHost.ID, "*"), defaultScanCountLimit).Result()
if err != nil {
return nil, err
}

var networkTopology map[string]string
for _, networkTopologyKey := range networkTopologyKeys {
if networkTopology, err = nt.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil {
logger.Error(err)
continue
}

// Add cache data.
nt.cache.Set(networkTopologyKey, networkTopology, nt.config.Cache.TTL)
}
}

neighbours := make([]*resource.Host, 0, n)
for _, networkTopologyKey := range networkTopologyKeys {
if len(neighbours) >= n {
break
}

_, _, _, neighbourID, err := pkgredis.ParseNetworkTopologyKeyInScheduler(networkTopologyKey)
if err != nil {
logger.Error(err)
continue
}

neighbour, loaded := nt.resource.HostManager().Load(neighbourID)
if !loaded {
logger.Errorf("host %s not found", neighbourID)
continue
}
neighbours = append(neighbours, neighbour)
}

return neighbours, nil
}

// Snapshot writes the current network topology to the storage.
func (nt *networkTopology) Snapshot() error {
ctx, cancel := context.WithTimeout(context.Background(), snapshotContextTimeout)
defer cancel()

now := time.Now()
id := uuid.NewString()
probedCountKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeProbedCountKeyInScheduler("*"), math.MaxInt64).Result()
probedCountKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeProbedCountKeyInScheduler("*"), defaultScanCountLimit).Result()
if err != nil {
return err
}
Expand All @@ -341,7 +402,7 @@ func (nt *networkTopology) Snapshot() error {
}

// Construct destination hosts for network topology.
networkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, "*"), math.MaxInt64).Result()
networkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, "*"), defaultScanCountLimit).Result()
if err != nil {
logger.Error(err)
continue
Expand Down