Skip to content

Commit

Permalink
feat(ipset): consolidate ipset usage across controllers
Browse files Browse the repository at this point in the history
Before this, we had 2 different ways to interact with ipsets, through
the handler interface which had the best handling for IPv6 because NPC
heavily utilizes it, and through the ipset struct which mostly repeated
the handler logic, but didn't handle some key things.

NPC utilized the handler functions and NSC / NRC mostly utilized the old
ipset struct functions. This caused a lot of duplication between the two
groups of functions and also caused issues with proper IPv6 handling.

This commit consolidates the two sets of usage into just the handler
interface. This greatly simplifies how the controllers interact with
ipsets and it also reduces the logic complexity on the ipset side.

This also fixes up some inconsistency with how we handled IPv6 ipset
names. ipset likes them to be prefixed with inet6:, but we weren't
always doing this in a way that made sense and was consistent across all
functions in the ipset struct.
  • Loading branch information
aauren committed Sep 23, 2023
1 parent 854440d commit a3c5694
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 232 deletions.
4 changes: 4 additions & 0 deletions pkg/controllers/netpol/network_policy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ func (ips *fakeIPSet) Sets() map[string]*utils.Set {
return nil
}

func (ips *fakeIPSet) Name(name string) string {
return name
}

func TestNetworkPolicyController(t *testing.T) {
testCases := []tNetPolConfigTestCase{
{
Expand Down
62 changes: 23 additions & 39 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ const (
svcSchedFlagsAnnotation = "kube-router.io/service.schedflags"

// All IPSET names need to be less than 31 characters in order for the Kernel to accept them. Keep in mind that the
// actual formulation for this may be inet6:<setNameBase> depending on ip family so that means that these base names
// actually need to be less than 25 characters
// actual formulation for this may be inet6:<setNameBase> depending on ip family, plus when we change ipsets we use
// a swap operation that adds a hyphen to the end, so that means that these base names actually need to be less than
// 24 characters
localIPsIPSetName = "kube-router-local-ips"
serviceIPPortsSetName = "kube-router-svip-prt"
serviceIPsIPSetName = "kube-router-svip"
Expand Down Expand Up @@ -120,11 +121,6 @@ type NetworkServicesController struct {
ipsetMutex *sync.Mutex
fwMarkMap map[uint32]string

// Map of ipsets that we use.
localIPsIPSets map[v1.IPFamily]*utils.Set
serviceIPPortsIPSet map[v1.IPFamily]*utils.Set
serviceIPsIPSet map[v1.IPFamily]*utils.Set

svcLister cache.Indexer
epSliceLister cache.Indexer
podLister cache.Indexer
Expand Down Expand Up @@ -399,34 +395,27 @@ func (nsc *NetworkServicesController) setupIpvsFirewall() error {
- create ipsets
- create firewall rules
*/

var err error
var ipset *utils.Set

// Remember ipsets for use in syncIpvsFirewall
nsc.localIPsIPSets = make(map[v1.IPFamily]*utils.Set)
nsc.serviceIPPortsIPSet = make(map[v1.IPFamily]*utils.Set)
nsc.serviceIPsIPSet = make(map[v1.IPFamily]*utils.Set)
for family, ipSetHandler := range nsc.ipSetHandlers {
// Initialize some blank ipsets with the correct names in order to use them in the iptables below. We don't need
// to retain references to them, because we'll use the handler to refresh them later in syncIpvsFirewall
for _, ipSetHandler := range nsc.ipSetHandlers {
// Create ipset for local addresses.
ipset, err = ipSetHandler.Create(localIPsIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
_, err = ipSetHandler.Create(localIPsIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s - %v", localIPsIPSetName, err)
}
nsc.localIPsIPSets[family] = ipset

// Create 2 ipsets for services. One for 'ip' and one for 'ip,port'
ipset, err = ipSetHandler.Create(serviceIPsIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
_, err = ipSetHandler.Create(serviceIPsIPSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s - %v", serviceIPsIPSetName, err)
}
nsc.serviceIPsIPSet[family] = ipset

ipset, err = ipSetHandler.Create(serviceIPPortsSetName, utils.TypeHashIPPort, utils.OptionTimeout, "0")
_, err = ipSetHandler.Create(serviceIPPortsSetName, utils.TypeHashIPPort, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s - %v", serviceIPPortsSetName, err)
}
nsc.serviceIPPortsIPSet[family] = ipset
}

// Setup a custom iptables chain to explicitly allow input traffic to ipvs services only.
Expand Down Expand Up @@ -612,16 +601,13 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {

for family, addrs := range addrsMap {
// Convert addrs from a slice of net.IP to a slice of string
localIPsSets := make([]string, 0, len(addrs))
localIPsSets := make([][]string, 0, len(addrs))
for _, addr := range addrs {
localIPsSets = append(localIPsSets, addr.String())
localIPsSets = append(localIPsSets, []string{addr.String(), utils.OptionTimeout, "0"})
}

// Refresh the family specific IPSet with the slice of strings
err = nsc.localIPsIPSets[family].Refresh(localIPsSets)
if err != nil {
return fmt.Errorf("failed to sync ipset: %s", err.Error())
}
nsc.ipSetHandlers[family].RefreshSet(localIPsIPSetName, localIPsSets, utils.TypeHashIP)
}

// Populate service ipsets.
Expand All @@ -630,8 +616,8 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {
return errors.New("Failed to list IPVS services: " + err.Error())
}

serviceIPsSets := make(map[v1.IPFamily][]string)
serviceIPPortsIPSets := make(map[v1.IPFamily][]string)
serviceIPsSets := make(map[v1.IPFamily][][]string)
serviceIPPortsIPSets := make(map[v1.IPFamily][][]string)

for _, ipvsService := range ipvsServices {
var address net.IP
Expand Down Expand Up @@ -667,24 +653,22 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {
family = v1.IPv6Protocol
}

serviceIPsSets[family] = append(serviceIPsSets[family], address.String())
serviceIPsSets[family] = append(serviceIPsSets[family], []string{address.String(), utils.OptionTimeout, "0"})

ipvsAddressWithPort := fmt.Sprintf("%s,%s:%d", address, protocol, port)
serviceIPPortsIPSets[family] = append(serviceIPPortsIPSets[family], ipvsAddressWithPort)
serviceIPPortsIPSets[family] = append(serviceIPPortsIPSets[family],
[]string{ipvsAddressWithPort, utils.OptionTimeout, "0"})

}

for family := range nsc.ipSetHandlers {
serviceIPsIPSet := nsc.serviceIPsIPSet[family]
err = serviceIPsIPSet.Refresh(serviceIPsSets[family])
if err != nil {
return fmt.Errorf("failed to sync ipset: %v", err)
}
for family, setHandler := range nsc.ipSetHandlers {
setHandler.RefreshSet(serviceIPsIPSetName, serviceIPsSets[family], utils.TypeHashIP)

setHandler.RefreshSet(serviceIPPortsSetName, serviceIPPortsIPSets[family], utils.TypeHashIPPort)

serviceIPPortsIPSet := nsc.serviceIPPortsIPSet[family]
err = serviceIPPortsIPSet.Refresh(serviceIPPortsIPSets[family])
err := setHandler.Restore()
if err != nil {
return fmt.Errorf("failed to sync ipset: %v", err)
return fmt.Errorf("could not save ipset for service firewall: %v", err)
}
}

Expand Down
54 changes: 14 additions & 40 deletions pkg/controllers/routing/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,8 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error {
nodes := nrc.nodeLister.List()

// Collect active PodCIDR(s) and NodeIPs from nodes
currentPodCidrs := make(map[v1core.IPFamily][]string)
currentNodeIPs := make(map[v1core.IPFamily][]string)
currentPodCidrs := make(map[v1core.IPFamily][][]string)
currentNodeIPs := make(map[v1core.IPFamily][][]string)
for _, obj := range nodes {
node := obj.(*v1core.Node)
podCIDRs := getPodCIDRsFromAllNodeSources(node)
Expand All @@ -978,26 +978,28 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error {
klog.Warningf("Wasn't able to parse pod CIDR %s for node %s, skipping", cidr, node.Name)
}
if ip.To4() != nil {
currentPodCidrs[v1core.IPv4Protocol] = append(currentPodCidrs[v1core.IPv4Protocol], cidr)
currentPodCidrs[v1core.IPv4Protocol] = append(currentPodCidrs[v1core.IPv4Protocol],
[]string{cidr, utils.OptionTimeout, "0"})
} else {
currentPodCidrs[v1core.IPv6Protocol] = append(currentPodCidrs[v1core.IPv6Protocol], cidr)
currentPodCidrs[v1core.IPv6Protocol] = append(currentPodCidrs[v1core.IPv6Protocol],
[]string{cidr, utils.OptionTimeout, "0"})
}
}

var ipv4Addrs, ipv6Addrs []string
var ipv4Addrs, ipv6Addrs [][]string
intExtNodeIPsv4, intExtNodeIPsv6 := utils.GetAllNodeIPs(node)
if err != nil {
klog.Errorf("Failed to find a node IP, cannot add to node ipset which could affect routing: %v", err)
continue
}
for _, nodeIPv4s := range intExtNodeIPsv4 {
for _, nodeIPv4 := range nodeIPv4s {
ipv4Addrs = append(ipv4Addrs, nodeIPv4.String())
ipv4Addrs = append(ipv4Addrs, []string{nodeIPv4.String(), utils.OptionTimeout, "0"})
}
}
for _, nodeIPv6s := range intExtNodeIPsv6 {
for _, nodeIPv6 := range nodeIPv6s {
ipv6Addrs = append(ipv6Addrs, nodeIPv6.String())
ipv6Addrs = append(ipv6Addrs, []string{nodeIPv6.String(), utils.OptionTimeout, "0"})
}
}
currentNodeIPs[v1core.IPv4Protocol] = append(currentNodeIPs[v1core.IPv4Protocol], ipv4Addrs...)
Expand All @@ -1006,41 +1008,13 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error {

// Syncing Pod subnet ipset entries
for family, ipSetHandler := range nrc.ipSetHandlers {
psSet := ipSetHandler.Get(podSubnetsIPSetName)
if psSet == nil {
klog.Infof("Creating missing ipset \"%s\"", podSubnetsIPSetName)
_, err = ipSetHandler.Create(podSubnetsIPSetName, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("ipset \"%s\" not found in controller instance",
podSubnetsIPSetName)
}
psSet = ipSetHandler.Get(podSubnetsIPSetName)
if nil == psSet {
return fmt.Errorf("failed to get ipsethandler for ipset \"%s\"", podSubnetsIPSetName)
}
}
err = psSet.Refresh(currentPodCidrs[family])
if err != nil {
return fmt.Errorf("failed to sync Pod Subnets ipset: %s", err)
}
ipSetHandler.RefreshSet(podSubnetsIPSetName, currentPodCidrs[family], utils.TypeHashNet)

// Syncing Node Addresses ipset entries
naSet := ipSetHandler.Get(nodeAddrsIPSetName)
if naSet == nil {
klog.Infof("Creating missing ipset \"%s\"", nodeAddrsIPSetName)
_, err = ipSetHandler.Create(nodeAddrsIPSetName, utils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("ipset \"%s\" not found in controller instance",
nodeAddrsIPSetName)
}
naSet = ipSetHandler.Get(nodeAddrsIPSetName)
if nil == naSet {
return fmt.Errorf("failed to get ipsethandler for ipset \"%s\"", nodeAddrsIPSetName)
}
}
err = naSet.Refresh(currentNodeIPs[family])
ipSetHandler.RefreshSet(nodeAddrsIPSetName, currentNodeIPs[family], utils.TypeHashIP)

err = ipSetHandler.Restore()
if err != nil {
return fmt.Errorf("failed to sync Node Addresses ipset: %s", err)
return fmt.Errorf("failed to sync pod subnets / node addresses ipsets: %v", err)
}
}
return nil
Expand Down
Loading

0 comments on commit a3c5694

Please sign in to comment.