Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add scan function in redis and neighbours function in network topology #3048

Merged
merged 22 commits into from
Feb 2, 2024
Merged
26 changes: 26 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/fs"
"os"
"path/filepath"
"regexp"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -56,6 +57,7 @@ const (
)

type Cache interface {
Scan(p string, n int) ([]string, error)
Set(k string, x any, d time.Duration)
SetDefault(k string, x any)
Add(k string, x any, d time.Duration) error
Expand All @@ -82,6 +84,30 @@ type cache struct {
janitor *janitor
}

// Scan all items to get a specified number of matching regex keys.
func (c *cache) Scan(p string, n int) ([]string, error) {
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
c.mu.RLock()
defer c.mu.RUnlock()

var keys []string
reg, err := regexp.Compile(p)
if err != nil {
return nil, err
}

for k := range c.items {
if reg.MatchString(k) {
keys = append(keys, k)
}

if len(keys) >= n {
break
}
}

return keys, nil
}

// Add an item to the cache, replacing any existing item. If the duration is 0
// (DefaultExpiration), the cache's default expiration time is used. If it is -1
// (NoExpiration), the item never expires.
Expand Down
13 changes: 13 additions & 0 deletions pkg/cache/cache_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,56 @@ func TestStorePointerToStruct(t *testing.T) {
}
}

func TestScan(t *testing.T) {
tc := New(DefaultExpiration, 0)
tc.Set(v2, v1, DefaultExpiration)
tc.Set(v3, v1, DefaultExpiration)
keys, err := tc.Scan("^b", 1)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 1 {
t.Error("invalid number of scaning cache keys")
}

keys, err = tc.Scan("^b", 2)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 2 {
t.Error("invalid number of scaning cache keys")
}

keys, err = tc.Scan("^b", 4)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 2 {
t.Error("invalid number of scaning cache keys")
}

keys, err = tc.Scan("^ba", 2)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 2 {
t.Error("invalid number of scaning cache keys")
}

keys, err = tc.Scan("^a", 2)
if err != nil {
t.Error("Couldn't parse a regular expression and returns")
}

if len(keys) != 0 {
t.Error("invalid number of scaning cache keys")
}
}

func TestAdd(t *testing.T) {
tc := New(DefaultExpiration, 0)
err := tc.Add(v1, v2, DefaultExpiration)
Expand Down
15 changes: 15 additions & 0 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type NetworkTopology interface {
// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

// Neighbours gets the specified number of neighbors for root node.
Neighbours(root *resource.Host, number int) ([]*resource.Host, error)
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved

// Snapshot writes the current network topology to the storage.
Snapshot() error
}
Expand Down Expand Up @@ -321,6 +324,58 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
return probedCount, nil
}

// Neighbours gets the specified number of neighbors for root node.
func (nt *networkTopology) Neighbours(root *resource.Host, number int) ([]*resource.Host, error) {
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

networkTopologyKeys, err := nt.cache.Scan(pkgredis.MakeNetworkTopologyKeyInScheduler(root.ID, "*"), number)
if err != nil {
return nil, err
}

// If we cannot get a sufficient number of neighbors from the cache, then we access redis.
if len(networkTopologyKeys) < number {
networkTopologyKeys, _, err = nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(root.ID, "*"), math.MaxInt64).Result()
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

var networkTopology map[string]string
for _, networkTopologyKey := range networkTopologyKeys {
if networkTopology, err = nt.rdb.HGetAll(ctx, networkTopologyKey).Result(); err != nil {
logger.Error(err)
continue
}

// Add cache data.
nt.cache.Set(networkTopologyKey, networkTopology, nt.config.Cache.TTL)
}
}

var neighbours []*resource.Host
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
for _, networkTopologyKey := range networkTopologyKeys {
_, _, _, neighbourID, err := pkgredis.ParseNetworkTopologyKeyInScheduler(networkTopologyKey)
if err != nil {
logger.Error(err)
continue
}

neighbour, loaded := nt.resource.HostManager().Load(neighbourID)
if !loaded {
logger.Errorf("host %s not found", neighbourID)
continue
}
neighbours = append(neighbours, neighbour)

if len(neighbours) >= number {
break
}
}

return neighbours, nil
}

// Snapshot writes the current network topology to the storage.
func (nt *networkTopology) Snapshot() error {
ctx, cancel := context.WithTimeout(context.Background(), snapshotContextTimeout)
Expand Down