Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node-neigh: Locking, logging, misc improvements #15783

Merged
merged 7 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Documentation/cmdref/cilium-agent.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func init() {
flags.Bool(option.AnnotateK8sNode, defaults.AnnotateK8sNode, "Annotate Kubernetes node")
option.BindEnv(option.AnnotateK8sNode)

flags.Duration(option.ARPPingRefreshPeriod, 5*time.Minute, "Period for remote node ARP entry refresh (set 0 to disable)")
option.BindEnv(option.ARPPingRefreshPeriod)

flags.Bool(option.AutoCreateCiliumNodeResource, defaults.AutoCreateCiliumNodeResource, "Automatically create CiliumNode resource for own node on startup")
option.BindEnv(option.AutoCreateCiliumNodeResource)

Expand Down Expand Up @@ -1686,7 +1689,7 @@ func runDaemon() {
}

// Start periodical arping to refresh neighbor table
if d.datapath.Node().NodeNeighDiscoveryEnabled() {
if d.datapath.Node().NodeNeighDiscoveryEnabled() && option.Config.ARPPingRefreshPeriod != 0 {
d.nodeDiscovery.Manager.StartNeighborRefresh(d.datapath.Node())
}

Expand Down
103 changes: 58 additions & 45 deletions pkg/datapath/linux/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"os"
"reflect"

"github.com/cilium/cilium/pkg/cidr"
"github.com/cilium/cilium/pkg/counter"
Expand Down Expand Up @@ -57,6 +58,7 @@ type linuxNodeHandler struct {
datapathConfig DatapathConfiguration
nodes map[nodeTypes.Identity]*nodeTypes.Node
enableNeighDiscovery bool
neighLock lock.Mutex // protects neigh* fields below
neighDiscoveryLink netlink.Link
neighNextHopByNode map[nodeTypes.Identity]string // val = string(net.IP)
neighNextHopRefCount counter.StringCounter
Expand Down Expand Up @@ -628,7 +630,7 @@ func (n *linuxNodeHandler) encryptNode(newNode *nodeTypes.Node) {

}

func (n *linuxNodeHandler) getSrcAndNextHopIPv4(nodeIPv4 net.IP) (srcIPv4, nextHopIPv4 net.IP, err error) {
func getSrcAndNextHopIPv4(nodeIPv4 net.IP) (srcIPv4, nextHopIPv4 net.IP, err error) {
// Figure out whether nodeIPv4 is directly reachable (i.e. in the same L2)
routes, err := netlink.RouteGet(nodeIPv4)
if err != nil {
Expand Down Expand Up @@ -666,46 +668,46 @@ func (n *linuxNodeHandler) getSrcAndNextHopIPv4(nodeIPv4 net.IP) (srcIPv4, nextH
// which tries to update ARP entries previously inserted by insertNeighbor(). In
// this case it does not bail out early if the ARP entry already exists, and
// sends the ARP request anyway.
//
// The method must be called with linuxNodeHandler.mutex held.
func (n *linuxNodeHandler) insertNeighbor(ctx context.Context, newNode *nodeTypes.Node, refresh bool) {
if newNode.IsLocal() {
var link netlink.Link
n.neighLock.Lock()
if n.neighDiscoveryLink == nil || reflect.ValueOf(n.neighDiscoveryLink).IsNil() {
brb marked this conversation as resolved.
Show resolved Hide resolved
n.neighLock.Unlock()
// Nothing to do - the discovery link was not set yet
return
}
link = n.neighDiscoveryLink
n.neighLock.Unlock()

newNodeIP := newNode.GetNodeIP(false).To4()
nextHopIPv4 := make(net.IP, len(newNodeIP))
copy(nextHopIPv4, newNodeIP)

scopedLog := log.WithFields(logrus.Fields{
logfields.LogSubsys: "node-neigh-debug",
logfields.Interface: n.neighDiscoveryLink.Attrs().Name,
logfields.Interface: link.Attrs().Name,
})

srcIPv4, nextHopIPv4, err := n.getSrcAndNextHopIPv4(nextHopIPv4)
srcIPv4, nextHopIPv4, err := getSrcAndNextHopIPv4(nextHopIPv4)
if err != nil {
scopedLog.WithError(err).Info("Unable to determine source and nexthop IP addr")
return
}
nextHopStr := nextHopIPv4.String()

scopedLog = scopedLog.WithField(logfields.IPAddr, nextHopIPv4)

nextHopStr := nextHopIPv4.String()
n.neighLock.Lock()

nextHopIsNew := false
if existingNextHopStr, found := n.neighNextHopByNode[newNode.Identity()]; found {
if existingNextHopStr == nextHopStr {
// We already know about the nextHop of the given newNode. Can happen
// when insertNeighbor is called by NodeUpdate multiple times for
// the same node.
if !refresh {
// In the case of refresh, don't return early, as we want to
// update the related neigh entry even if the nextHop is the same
// (e.g. to detect the GW MAC addr change).
return
}
} else if n.neighNextHopRefCount.Delete(existingNextHopStr) {
if existingNextHopStr != nextHopStr && n.neighNextHopRefCount.Delete(existingNextHopStr) {
// nextHop has changed and nobody else is using it, so remove the old one.
neigh, found := n.neighByNextHop[existingNextHopStr]
if found {
// Note that we don't move the removal via netlink which might
// block from the hot path (e.g. with defer), as this case can
// happen very rarely.
if err := netlink.NeighDel(neigh); err != nil {
scopedLog.WithFields(logrus.Fields{
logfields.IPAddr: neigh.IP,
Expand All @@ -719,25 +721,34 @@ func (n *linuxNodeHandler) insertNeighbor(ctx context.Context, newNode *nodeType
}
}
}
} else {
// nextHop for the given node was previously not found, so let's
// increment ref counter. This can happen upon regular NodeUpdate event
// or by the periodic ARP refresher which got executed before
// NodeUpdate().
nextHopIsNew = n.neighNextHopRefCount.Add(nextHopStr)
}

n.neighNextHopByNode[newNode.Identity()] = nextHopStr

nextHopIsNew := false
if !refresh {
nextHopIsNew = n.neighNextHopRefCount.Add(nextHopStr)
}
n.neighLock.Unlock() // to allow concurrent arpings below

// nextHop hasn't been arpinged before OR we are refreshing neigh entry
var hwAddr net.HardwareAddr
if nextHopIsNew || refresh {
hwAddr, err := arp.PingOverLink(n.neighDiscoveryLink, srcIPv4, nextHopIPv4)
hwAddr, err = arp.PingOverLink(link, srcIPv4, nextHopIPv4)
if err != nil {
scopedLog.WithError(err).Info("arping failed")
scopedLog.WithError(err).Debug("arping failed")
metrics.ArpingRequestsTotal.WithLabelValues(failed).Inc()
return
}
metrics.ArpingRequestsTotal.WithLabelValues(success).Inc()
}

n.neighLock.Lock()
brb marked this conversation as resolved.
Show resolved Hide resolved
defer n.neighLock.Unlock()

if hwAddr != nil {
if prevHwAddr, found := n.neighByNextHop[nextHopStr]; found && prevHwAddr.String() == hwAddr.String() {
// Nothing to update, return early to avoid calling to netlink. This
// is based on the assumption that n.neighByNextHop gets populated
Expand All @@ -746,18 +757,16 @@ func (n *linuxNodeHandler) insertNeighbor(ctx context.Context, newNode *nodeType
}

if option.Config.NodePortHairpin {
defer func() {
// Remove nextHopIPv4 entry in the neigh BPF map. Otherwise,
// we risk to silently blackhole packets instead of emitting
// DROP_NO_FIB if the netlink.NeighSet() below fails.
neighborsmap.NeighRetire(nextHopIPv4)
}()
// Remove nextHopIPv4 entry in the neigh BPF map. Otherwise,
// we risk to silently blackhole packets instead of emitting
// DROP_NO_FIB if the netlink.NeighSet() below fails.
defer neighborsmap.NeighRetire(nextHopIPv4)
}

scopedLog = scopedLog.WithField(logfields.HardwareAddr, hwAddr)

neigh := netlink.Neigh{
LinkIndex: n.neighDiscoveryLink.Attrs().Index,
LinkIndex: link.Attrs().Index,
IP: nextHopIPv4,
HardwareAddr: hwAddr,
State: netlink.NUD_PERMANENT,
Expand All @@ -779,21 +788,21 @@ func (n *linuxNodeHandler) insertNeighbor(ctx context.Context, newNode *nodeType
func (n *linuxNodeHandler) refreshNeighbor(ctx context.Context, nodeToRefresh *nodeTypes.Node, completed chan struct{}) {
defer close(completed)

n.mutex.Lock()
defer n.mutex.Unlock()

n.insertNeighbor(ctx, nodeToRefresh, true)
}

// Must be called with linuxNodeHandler.mutex held.
func (n *linuxNodeHandler) deleteNeighbor(oldNode *nodeTypes.Node) {
n.neighLock.Lock()
defer n.neighLock.Unlock()

nextHopStr, found := n.neighNextHopByNode[oldNode.Identity()]
if !found {
return
}
defer func() { delete(n.neighNextHopByNode, oldNode.Identity()) }()

if n.neighNextHopRefCount.Delete(nextHopStr) {

neigh, found := n.neighByNextHop[nextHopStr]
delete(n.neighByNextHop, nextHopStr)

Expand Down Expand Up @@ -902,8 +911,12 @@ func (n *linuxNodeHandler) nodeUpdate(oldNode, newNode *nodeTypes.Node, firstAdd
newKey = newNode.EncryptionKey
}

if n.enableNeighDiscovery {
n.insertNeighbor(context.Background(), newNode, false)
if n.enableNeighDiscovery && !newNode.IsLocal() {
// Running insertNeighbor in a separate goroutine relies on the following
// assumptions:
// 1. newNode is accessed only by reads.
// 2. It is safe to invoke insertNeighbor for the same node.
go n.insertNeighbor(context.Background(), newNode, false)
brb marked this conversation as resolved.
Show resolved Hide resolved
}

if n.nodeConfig.EnableIPSec && !n.subnetEncryption() {
Expand Down Expand Up @@ -1009,7 +1022,7 @@ func (n *linuxNodeHandler) nodeDelete(oldNode *nodeTypes.Node) error {
}

if n.enableNeighDiscovery {
n.deleteNeighbor(oldNode)
go n.deleteNeighbor(oldNode)
}

if n.nodeConfig.EnableIPSec {
Expand Down Expand Up @@ -1349,7 +1362,11 @@ func (n *linuxNodeHandler) NodeConfigurationChanged(newConfig datapath.LocalNode
return fmt.Errorf("cannot find link by name %s for neigh discovery: %w",
ifaceName, err)
}
// neighDiscoveryLink can be accessed by a concurrent insertNeighbor
// goroutine.
n.neighLock.Lock()
n.neighDiscoveryLink = link
n.neighLock.Unlock()
}
}

Expand Down Expand Up @@ -1416,13 +1433,9 @@ func (n *linuxNodeHandler) NodeNeighborRefresh(ctx context.Context, nodeToRefres

refreshComplete := make(chan struct{})
go n.refreshNeighbor(ctx, &nodeToRefresh, refreshComplete)
for {
select {
case <-ctx.Done():
return
case <-refreshComplete:
return
}
select {
case <-ctx.Done():
case <-refreshComplete:
}
}

Expand Down
90 changes: 89 additions & 1 deletion pkg/datapath/linux/node_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package linux

import (
"context"
"crypto/rand"
"net"
"runtime"
"sync"
"testing"
"time"

"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/cidr"
Expand Down Expand Up @@ -1031,6 +1034,7 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandling(c *check.C) {
// related neigh entry.
err = linuxNodeHandler.NodeAdd(nodev1)
c.Assert(err, check.IsNil)
time.Sleep(100 * time.Millisecond) // insertNeighbor is invoked async

// Check whether an arp entry for nodev1 IP addr (=veth1) was added
neighs, err := netlink.NeighList(veth0.Attrs().Index, netlink.FAMILY_V4)
Expand Down Expand Up @@ -1077,6 +1081,7 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandling(c *check.C) {
// Remove nodev1, and check whether the arp entry was removed
err = linuxNodeHandler.NodeDelete(nodev1)
c.Assert(err, check.IsNil)
time.Sleep(100 * time.Millisecond) // deleteNeighbor is invoked async

neighs, err = netlink.NeighList(veth0.Attrs().Index, netlink.FAMILY_V4)
c.Assert(err, check.IsNil)
Expand All @@ -1089,6 +1094,86 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandling(c *check.C) {
}
c.Assert(found, check.Equals, false)

// Create multiple goroutines which call insertNeighbor and check whether
// MAC changes of veth1 are properly handled. This is a basic randomized
// testing of insertNeighbor() fine-grained locking.
err = linuxNodeHandler.NodeAdd(nodev1)
c.Assert(err, check.IsNil)
time.Sleep(100 * time.Millisecond)

rndHWAddr := func() net.HardwareAddr {
mac := make([]byte, 6)
_, err := rand.Read(mac)
c.Assert(err, check.IsNil)
mac[0] = (mac[0] | 2) & 0xfe
return net.HardwareAddr(mac)
}
neighRefCount := func(nextHopStr string) int {
linuxNodeHandler.neighLock.Lock()
defer linuxNodeHandler.neighLock.Unlock()
return linuxNodeHandler.neighNextHopRefCount[nextHopStr]
}
neighHwAddr := func(nextHopStr string) string {
linuxNodeHandler.neighLock.Lock()
defer linuxNodeHandler.neighLock.Unlock()
if neigh, found := linuxNodeHandler.neighByNextHop[nextHopStr]; found {
return neigh.HardwareAddr.String()
}
return ""
}

done := make(chan struct{})
count := 30
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
for {
linuxNodeHandler.insertNeighbor(context.Background(), &nodev1, true)
select {
case <-ticker.C:
case <-done:
return
}
}
}()
}
for i := 0; i < 10; i++ {
mac := rndHWAddr()
// Change MAC
netns0.Do(func(ns.NetNS) error {
veth1, err := netlink.LinkByName("veth1")
c.Assert(err, check.IsNil)
err = netlink.LinkSetHardwareAddr(veth1, mac)
c.Assert(err, check.IsNil)
return nil
})
// Check that MAC has been changed in the neigh table
time.Sleep(500 * time.Millisecond)
neighs, err = netlink.NeighList(veth0.Attrs().Index, netlink.FAMILY_V4)
c.Assert(err, check.IsNil)
found = false
for _, n := range neighs {
if n.IP.Equal(ip1) && n.State == netlink.NUD_PERMANENT {
c.Assert(n.HardwareAddr.String(), check.Equals, mac.String())
c.Assert(neighHwAddr(ip1.String()), check.Equals, mac.String())
c.Assert(neighRefCount(ip1.String()), check.Equals, 1)
found = true
break
}
}
c.Assert(found, check.Equals, true)

}
// Cleanup
close(done)
wg.Wait()
err = linuxNodeHandler.NodeDelete(nodev1)
c.Assert(err, check.IsNil)
time.Sleep(100 * time.Millisecond) // deleteNeighbor is invoked async

// Setup routine for the 2. test
setupRemoteNode := func(vethName, vethPeerName, netnsName, vethCIDR, vethIPAddr,
vethPeerIPAddr string) (cleanup func(), errRet error) {
Expand Down Expand Up @@ -1240,6 +1325,7 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandling(c *check.C) {
IPAddresses: []nodeTypes.Address{{nodeaddressing.NodeInternalIP, node3IP}},
}
c.Assert(linuxNodeHandler.NodeAdd(nodev3), check.IsNil)
time.Sleep(100 * time.Millisecond) // insertNeighbor is invoked async

nextHop := net.ParseIP("9.9.9.250")
// Check that both node{2,3} are via nextHop (gw)
Expand Down Expand Up @@ -1269,9 +1355,11 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandling(c *check.C) {

// However, removing node3 should remove the neigh entry for nextHop
c.Assert(linuxNodeHandler.NodeDelete(nodev3), check.IsNil)
time.Sleep(100 * time.Millisecond) // deleteNeighbor is invoked async

found = false
neighs, err = netlink.NeighList(veth0.Attrs().Index, netlink.FAMILY_V4)
c.Assert(err, check.IsNil)
found = false
for _, n := range neighs {
if n.IP.Equal(nextHop) && n.State == netlink.NUD_PERMANENT {
found = true
Expand Down