Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add scan function in redis and neighbours function in network topology #3048

Merged
merged 22 commits into from
Feb 2, 2024
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/docker/go-units v0.4.0
github.com/gaius-qi/ping v1.0.0
github.com/gammazero/deque v0.2.1
github.com/gin-contrib/gzip v0.0.6
github.com/gin-contrib/static v0.0.1
github.com/gin-contrib/zap v0.2.0
github.com/gin-gonic/gin v1.9.1
Expand Down Expand Up @@ -127,7 +128,6 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/gzip v0.0.6 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
15 changes: 15 additions & 0 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type NetworkTopology interface {
// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

// Neighbours gets the specified number of neighbors for root node.
Neighbours(root *resource.Host, number int) ([]*resource.Host, error)
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved

// Snapshot writes the current network topology to the storage.
Snapshot() error
}
Expand Down Expand Up @@ -321,6 +324,39 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
return probedCount, nil
}

// Neighbours gets the specified number of neighbors for root node.
func (nt *networkTopology) Neighbours(root *resource.Host, number int) ([]*resource.Host, error) {
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()

var neighbours []*resource.Host
networkTopologyKeys, _, err := nt.rdb.Scan(ctx, 0, pkgredis.MakeNetworkTopologyKeyInScheduler(root.ID, "*"), math.MaxInt64).Result()
fcgxz2003 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return neighbours, err
}

for _, networkTopologyKey := range networkTopologyKeys {
_, _, _, neighbourID, err := pkgredis.ParseNetworkTopologyKeyInScheduler(networkTopologyKey)
if err != nil {
logger.Error(err)
continue
}

neighbour, loaded := nt.resource.HostManager().Load(neighbourID)
if !loaded {
logger.Errorf("host %s not found", neighbourID)
continue
}
neighbours = append(neighbours, neighbour)

if len(neighbours) >= number {
break
}
}

return neighbours, nil
}

// Snapshot writes the current network topology to the storage.
func (nt *networkTopology) Snapshot() error {
ctx, cancel := context.WithTimeout(context.Background(), snapshotContextTimeout)
Expand Down
124 changes: 124 additions & 0 deletions scheduler/networktopology/network_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,130 @@ func TestNetworkTopology_ProbedCount(t *testing.T) {
}
}

func TestNetworkTopology_Neighbours(t *testing.T) {
tests := []struct {
name string
mock func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, mockRDBClient redismock.ClientMock)
expect func(t *testing.T, networkTopology NetworkTopology, err error)
}{
{
name: "get neighbour success",
mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, mockRDBClient redismock.ClientMock) {
mockRDBClient.MatchExpectationsInOrder(true)
mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal(
[]string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
neighbours, err := networkTopology.Neighbours(mockSeedHost, 1)
assert.NoError(err)
assert.Equal(len(neighbours), 1)
},
},
{
name: "get neighbour keys error",
mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, mockRDBClient redismock.ClientMock) {
mockRDBClient.MatchExpectationsInOrder(true)
mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetErr(
errors.New("get neighbour keys error"))
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
_, err = networkTopology.Neighbours(mockSeedHost, 1)
assert.EqualError(err, "get neighbour keys error")
},
},
{
name: "parse network topology key error",
mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, mockRDBClient redismock.ClientMock) {
mockRDBClient.MatchExpectationsInOrder(true)
mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal(
[]string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)}, 0)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockHost.ID)).Return(nil, false),
)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
neighbours, err := networkTopology.Neighbours(mockSeedHost, 1)
assert.NoError(err)
assert.Equal(len(neighbours), 0)
},
},
{
name: "load host error",
mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, mockRDBClient redismock.ClientMock) {
mockRDBClient.MatchExpectationsInOrder(true)
mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal(
[]string{"foo"}, 0)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
neighbours, err := networkTopology.Neighbours(mockSeedHost, 1)
assert.NoError(err)
assert.Equal(len(neighbours), 0)
},
},
{
name: "neighbors number is greater than the required number",
mock: func(mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, mockRDBClient redismock.ClientMock) {
mockRDBClient.MatchExpectationsInOrder(true)
mockRDBClient.ExpectScan(0, pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "*"), math.MaxInt64).SetVal(
[]string{pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID),
pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "bar"),
pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, "baz")}, 0)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq("bar")).Return(&resource.Host{ID: "bar"}, true).Times(1),
)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
neighbours, err := networkTopology.Neighbours(mockSeedHost, 2)
assert.NoError(err)
assert.Equal(len(neighbours), 2)
assert.Equal(neighbours[0].ID, mockHost.ID)
assert.Equal(neighbours[1].ID, "bar")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()

rdb, mockRDBClient := redismock.NewClientMock()
res := resource.NewMockResource(ctl)
hostManager := resource.NewMockHostManager(ctl)
cache := cache.NewMockCache(ctl)
storage := storagemocks.NewMockStorage(ctl)
tc.mock(res.EXPECT(), hostManager, hostManager.EXPECT(), mockRDBClient)

networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, cache, res, storage)
tc.expect(t, networkTopology, err)
mockRDBClient.ClearExpect()
})
}
}

func TestNetworkTopology_Snapshot(t *testing.T) {
tests := []struct {
name string
Expand Down