Skip to content

Commit

Permalink
devices_controller: Switched devices and routes from StateDB to StateDB2
Browse files Browse the repository at this point in the history
This commit switches the devices controller over from the old StateDB
to the new StateDB2 implementation.

Signed-off-by: Dylan Reimerink <dylan.reimerink@isovalent.com>
  • Loading branch information
dylandreimerink authored and joestringer committed Aug 22, 2023
1 parent a845cdf commit 8abf620
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 189 deletions.
9 changes: 6 additions & 3 deletions pkg/datapath/linux/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func (dm *DeviceManager) Detect(k8sEnabled bool) ([]string, error) {
return nil, nil
}

devs, _ := tables.SelectedDevices(dm.params.DeviceTable.Reader(dm.params.DB.ReadTxn()))
rxn := dm.params.DB.ReadTxn()
devs, _ := tables.SelectedDevices(dm.params.DeviceTable, rxn)
names := tables.DeviceNames(devs)

if len(names) == 0 && hasWildcard {
Expand Down Expand Up @@ -110,8 +111,10 @@ func (dm *DeviceManager) Listen(ctx context.Context) (chan []string, error) {

prevDevices := dm.initialDevices
for {
iter, invalidated := tables.SelectedDevices(dm.params.DeviceTable.Reader(dm.params.DB.ReadTxn()))
newDevices := tables.DeviceNames(iter)
rxn := dm.params.DB.ReadTxn()
devices, invalidated := tables.SelectedDevices(dm.params.DeviceTable, rxn)
newDevices := tables.DeviceNames(devices)

if slices.Equal(prevDevices, newDevices) {
continue
}
Expand Down
84 changes: 41 additions & 43 deletions pkg/datapath/linux/devices_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/cilium/cilium/pkg/inctimer"
"github.com/cilium/cilium/pkg/ip"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/statedb"
"github.com/cilium/cilium/pkg/statedb2"
)

// DevicesControllerCell registers a controller that subscribes to network devices
Expand Down Expand Up @@ -76,9 +76,9 @@ type devicesControllerParams struct {

Config DevicesConfig
Log logrus.FieldLogger
DB statedb.DB
DeviceTable statedb.Table[*tables.Device]
RouteTable statedb.Table[*tables.Route]
DB *statedb2.DB
DeviceTable statedb2.Table[*tables.Device]
RouteTable statedb2.Table[*tables.Route]

// netlinkFuncs is optional and used by tests to verify error handling behavior.
NetlinkFuncs *netlinkFuncs `optional:"true"`
Expand Down Expand Up @@ -260,22 +260,20 @@ func (dc *devicesController) initialize() error {
})
}

txn := dc.params.DB.WriteTxn()
txn := dc.params.DB.WriteTxn(dc.params.DeviceTable, dc.params.RouteTable)

// Flush existing data from potential prior run.
dc.params.DeviceTable.Writer(txn).DeleteAll(statedb.All)
dc.params.RouteTable.Writer(txn).DeleteAll(statedb.All)
dc.params.DeviceTable.DeleteAll(txn)
dc.params.RouteTable.DeleteAll(txn)

// Process the initial batch.
dc.processBatch(txn, batch)

iter, _ := tables.SelectedDevices(dc.params.DeviceTable.Reader(txn))
names := tables.DeviceNames(iter)
devs, _ := tables.SelectedDevices(dc.params.DeviceTable, txn)
names := tables.DeviceNames(devs)
dc.log.WithField(logfields.Devices, names).Info("Detected initial devices")

if err := txn.Commit(); err != nil {
return fmt.Errorf("failed to commit initial batch: %w", err)
}
txn.Commit()

select {
case <-dc.initialized:
Expand Down Expand Up @@ -332,13 +330,10 @@ func (dc *devicesController) processUpdates(

case <-ticker.C:
if len(batch) > 0 {
txn := dc.params.DB.WriteTxn()
txn := dc.params.DB.WriteTxn(dc.params.DeviceTable, dc.params.RouteTable)
dc.processBatch(txn, batch)
if err := txn.Commit(); err != nil {
dc.log.WithError(err).Warnf("Failed to commit devices and routes, retrying later")
} else {
batch = map[int][]any{}
}
txn.Commit()
batch = map[int][]any{}
}
}
}
Expand Down Expand Up @@ -368,15 +363,9 @@ func populateFromLink(d *tables.Device, link netlink.Link) {
// processBatch processes a batch of address, link and route updates.
// The address and link updates are merged into a device object and upserted
// into the device table.
func (dc *devicesController) processBatch(txn statedb.WriteTransaction, batch map[int][]any) {
devicesWriter := dc.params.DeviceTable.Writer(txn)
routesWriter := dc.params.RouteTable.Writer(txn)

func (dc *devicesController) processBatch(txn statedb2.WriteTxn, batch map[int][]any) {
for index, updates := range batch {
d, err := devicesWriter.First(tables.DeviceByIndex(index))
if err != nil {
panic("BUG: DeviceByIndex is broken")
}
d, _, _ := dc.params.DeviceTable.First(txn, tables.DeviceIDIndex.Query(index))
if d == nil {
// Unseen device. We may receive address updates before link updates
// and thus the only thing we know at this point is the index.
Expand Down Expand Up @@ -414,9 +403,15 @@ func (dc *devicesController) processBatch(txn statedb.WriteTransaction, batch ma
r.Gw, _ = netip.AddrFromSlice(u.Gw)

if u.Type == unix.RTM_NEWROUTE {
routesWriter.Insert(&r)
_, _, err := dc.params.RouteTable.Insert(txn, &r)
if err != nil {
dc.log.WithError(err).WithField(logfields.Route, r).Warn("Failed to insert route")
}
} else {
routesWriter.Delete(&r)
_, _, err := dc.params.RouteTable.Delete(txn, &r)
if err != nil {
dc.log.WithError(err).WithField(logfields.Route, r).Warn("Failed to delete route")
}
}
case netlink.LinkUpdate:
if u.Header.Type == unix.RTM_DELLINK {
Expand All @@ -433,13 +428,16 @@ func (dc *devicesController) processBatch(txn statedb.WriteTransaction, batch ma
if deviceDeleted {
// Remove the deleted device. The routes table will be cleaned up from the
// route updates.
devicesWriter.DeleteAll(tables.DeviceByIndex(index))
dc.params.DeviceTable.Delete(txn, d)
} else {
// Recheck the viability of the device after the updates have been applied.
d.Selected = dc.isSelectedDevice(d, routesWriter) && len(d.Addrs) > 0
d.Selected = dc.isSelectedDevice(d, txn) && len(d.Addrs) > 0

// Create or update the device.
devicesWriter.Insert(d)
_, _, err := dc.params.DeviceTable.Insert(txn, d)
if err != nil {
dc.log.WithError(err).WithField(logfields.Device, d).Warn("Failed to insert route")
}
}
}
}
Expand All @@ -454,7 +452,7 @@ const (

// isSelectedDevice checks if the device is selected or not. We still maintain its state in
// case it later becomes selected.
func (dc *devicesController) isSelectedDevice(d *tables.Device, routes statedb.TableReader[*tables.Route]) bool {
func (dc *devicesController) isSelectedDevice(d *tables.Device, txn statedb2.WriteTxn) bool {
if d.Name == "" {
// Looks like we have seen the addresses for this device before the initial link update,
// hence it has no name. Definitely not selected yet!
Expand Down Expand Up @@ -513,7 +511,7 @@ func (dc *devicesController) isSelectedDevice(d *tables.Device, routes statedb.T
// the device manually).
// This is a workaround for kubernetes-in-docker. We want to avoid
// veth devices in general as they may be leftovers from another CNI.
if !dc.filter.nonEmpty() && !tables.HasDefaultRoute(routes, d.Index) {
if !dc.filter.nonEmpty() && !tables.HasDefaultRoute(dc.params.RouteTable, txn, d.Index) {
log.Debug("Not selecting veth device as it has no default route")
return false
}
Expand All @@ -526,7 +524,7 @@ func (dc *devicesController) isSelectedDevice(d *tables.Device, routes statedb.T
return false
}

if !hasGlobalRoute(d.Index, routes) {
if !hasGlobalRoute(d.Index, dc.params.RouteTable, txn) {
log.Debug("Not selecting device as it has no global unicast routes")
return false
}
Expand All @@ -536,17 +534,17 @@ func (dc *devicesController) isSelectedDevice(d *tables.Device, routes statedb.T
return true
}

func hasGlobalRoute(devIndex int, routes statedb.TableReader[*tables.Route]) bool {
iter, err := routes.Get(tables.RouteByLinkIndex(devIndex))
if err != nil {
panic("BUG: RouteByLinkIndex is broken")
}
for route, ok := iter.Next(); ok; route, ok = iter.Next() {
if route.Dst.Addr().IsGlobalUnicast() {
return true
func hasGlobalRoute(devIndex int, tbl statedb2.Table[*tables.Route], rxn statedb2.ReadTxn) bool {
iter, _ := tbl.Get(rxn, tables.RouteLinkIndex.Query(devIndex))
hasGlobal := false
for r, _, ok := iter.Next(); ok; r, _, ok = iter.Next() {
if r.Dst.Addr().IsGlobalUnicast() {
hasGlobal = true
break
}
}
return false

return hasGlobal
}

// deviceFilter implements filtering device names either by
Expand Down
53 changes: 26 additions & 27 deletions pkg/datapath/linux/devices_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/statedb"
"github.com/cilium/cilium/pkg/statedb2"
"github.com/cilium/cilium/pkg/testutils"
)

Expand Down Expand Up @@ -203,12 +203,12 @@ func TestDevicesController(t *testing.T) {
withFreshNetNS(t, func(ns netns.NsHandle) {

var (
db statedb.DB
devicesTable statedb.Table[*tables.Device]
routesTable statedb.Table[*tables.Route]
db *statedb2.DB
devicesTable statedb2.Table[*tables.Device]
routesTable statedb2.Table[*tables.Route]
)
h := hive.New(
statedb.Cell,
statedb2.Cell,
tables.Cell,
DevicesControllerCell,
cell.Provide(func() (*netlinkFuncs, error) {
Expand All @@ -221,7 +221,7 @@ func TestDevicesController(t *testing.T) {
return DevicesConfig{}
}),

cell.Invoke(func(db_ statedb.DB, devicesTable_ statedb.Table[*tables.Device], routesTable_ statedb.Table[*tables.Route]) {
cell.Invoke(func(db_ *statedb2.DB, devicesTable_ statedb2.Table[*tables.Device], routesTable_ statedb2.Table[*tables.Route]) {
db = db_
devicesTable = devicesTable_
routesTable = routesTable_
Expand All @@ -239,22 +239,21 @@ func TestDevicesController(t *testing.T) {
// Get the new set of devices
for {
txn := db.ReadTxn()
devs, devsInvalidated := tables.SelectedDevices(devicesTable.Reader(txn))
devs, devsInvalidated := tables.SelectedDevices(devicesTable, txn)

routesIter, err := routesTable.Reader(txn).Get(statedb.All)
require.NoError(t, err)
routes := statedb.Collect[*tables.Route](routesIter)
routesIter, routesIterInvalidated := routesTable.All(txn)
routes := statedb2.Collect(routesIter)

if step.check(t, devs, routes) {
break
}

// Wait for a changes and try again.
select {
case <-routesIter.Invalidated():
case <-routesIterInvalidated:
case <-devsInvalidated:
case <-ctx.Done():
db.WriteJSON(os.Stdout)
txn.WriteJSON(os.Stdout)
t.Fatalf("Test case %q timed out while waiting for devices. Last devices seen: %+v", step.name, devs)
}
}
Expand All @@ -281,11 +280,11 @@ func TestDevicesController_Wildcards(t *testing.T) {
withFreshNetNS(t, func(ns netns.NsHandle) {

var (
db statedb.DB
devicesTable statedb.Table[*tables.Device]
db *statedb2.DB
devicesTable statedb2.Table[*tables.Device]
)
h := hive.New(
statedb.Cell,
statedb2.Cell,
tables.Cell,
DevicesControllerCell,
cell.Provide(func() DevicesConfig {
Expand All @@ -294,7 +293,7 @@ func TestDevicesController_Wildcards(t *testing.T) {
}
}),
cell.Provide(func() (*netlinkFuncs, error) { return makeNetlinkFuncs(ns) }),
cell.Invoke(func(db_ statedb.DB, devicesTable_ statedb.Table[*tables.Device]) {
cell.Invoke(func(db_ *statedb2.DB, devicesTable_ statedb2.Table[*tables.Device]) {
db = db_
devicesTable = devicesTable_
}))
Expand All @@ -305,8 +304,8 @@ func TestDevicesController_Wildcards(t *testing.T) {
require.NoError(t, createDummy("nonviable", "192.168.1.1/24", false))

for {
devices := devicesTable.Reader(db.ReadTxn())
devs, invalidated := tables.SelectedDevices(devices)
rxn := db.ReadTxn()
devs, invalidated := tables.SelectedDevices(devicesTable, rxn)

if len(devs) == 1 && devs[0].Name == "dummy0" {
break
Expand All @@ -330,8 +329,8 @@ func TestDevicesController_Restarts(t *testing.T) {
defer cancel()

var (
db statedb.DB
devicesTable statedb.Table[*tables.Device]
db *statedb2.DB
devicesTable statedb2.Table[*tables.Device]
)

logging.SetLogLevelToDebug()
Expand Down Expand Up @@ -436,12 +435,12 @@ func TestDevicesController_Restarts(t *testing.T) {
}

h := hive.New(
statedb.Cell,
statedb2.Cell,
tables.Cell,
DevicesControllerCell,
cell.Provide(func() DevicesConfig { return DevicesConfig{} }),
cell.Provide(func() *netlinkFuncs { return &funcs }),
cell.Invoke(func(db_ statedb.DB, devicesTable_ statedb.Table[*tables.Device]) {
cell.Invoke(func(db_ *statedb2.DB, devicesTable_ statedb2.Table[*tables.Device]) {
db = db_
devicesTable = devicesTable_
}))
Expand All @@ -450,9 +449,9 @@ func TestDevicesController_Restarts(t *testing.T) {
assert.NoError(t, err)

for {
iter, err := devicesTable.Reader(db.ReadTxn()).Get(statedb.All)
require.NoError(t, err)
devs := statedb.Collect[*tables.Device](iter)
rxn := db.ReadTxn()
iter, invalidated := devicesTable.All(rxn)
devs := statedb2.Collect(iter)

// We expect the 'stale' device to have been flushed by the restart
// and for the 'dummy' to have appeared.
Expand All @@ -462,9 +461,9 @@ func TestDevicesController_Restarts(t *testing.T) {

select {
case <-ctx.Done():
db.WriteJSON(os.Stdout)
rxn.WriteJSON(os.Stdout)
t.Fatalf("Test timed out while waiting for device, last seen: %v", devs)
case <-iter.Invalidated():
case <-invalidated:
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/datapath/linux/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"net"
"runtime"
"sort"
"time"

. "github.com/cilium/checkmate"
Expand All @@ -24,7 +25,7 @@ import (
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/statedb"
"github.com/cilium/cilium/pkg/statedb2"
"github.com/cilium/cilium/pkg/testutils"
)

Expand Down Expand Up @@ -245,6 +246,7 @@ func (s *DevicesSuite) TestDetect(c *C) {
c.Assert(err, IsNil)
devices, err = dm.Detect(true)
c.Assert(err, IsNil)
sort.Strings(devices)
c.Assert(devices, checker.DeepEquals, []string{"bond0", "dummy0", "dummy1", "dummy_v6", "veth0"})
option.Config.SetDevices([]string{})
dm.Stop()
Expand Down Expand Up @@ -658,7 +660,7 @@ func delRoutes(iface string) error {
func newDeviceManagerForTests() (dm *DeviceManager, err error) {
ns, _ := netns.Get()
h := hive.New(
statedb.Cell,
statedb2.Cell,
tables.Cell,
DevicesControllerCell,
cell.Provide(func() DevicesConfig {
Expand Down

0 comments on commit 8abf620

Please sign in to comment.