Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize NodePort performance by reducing request packets CT actions #3862

Merged
merged 1 commit into from Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 13 additions & 8 deletions pkg/agent/config/node_config.go
Expand Up @@ -45,18 +45,23 @@ const (
)

var (
// VirtualServiceIPv4 / VirtualServiceIPv6 are used in the following situations:
// - Use the virtual IP to perform SNAT for packets of Service from Antrea gateway and the Endpoint is not on
// local Pod CIDR or any remote Pod CIDRs. It is used in OVS flow of table serviceConntrackCommitTable.
// - Use the virtual IP to perform DNAT for packets of NodePort on host. It is used in iptables rules on host.
// - Use the virtual IP as onlink routing entry gateway in host routing entry.
// - Use the virtual IP as destination IP in host routing entry. It is used to forward DNATed NodePort packets
// or replied SNATed Service packets back to Antrea gateway.
// - Use the virtual IP for InternalIPAddress parameter of Add-NetNatStaticMapping.
// VirtualServiceIPv4 or VirtualServiceIPv6 is used in the following scenarios:
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
// - The IP is used to perform SNAT for packets of Service sourced from Antrea gateway and destined for external
// network via Antrea gateway.
// - The IP is used as destination IP in host routing entry to forward replied SNATed Service packets back to Antrea
// gateway.
// - The IP is used as the next hop of host routing entry for ClusterIP and virtual NodePort DNAT IP.
// - The IP is used for InternalIPAddress parameter of Add-NetNatStaticMapping on Windows.
// The IP cannot be one used in the network, and cannot be within the 169.254.1.0 - 169.254.254.255 range
// according to https://datatracker.ietf.org/doc/html/rfc3927#section-2.1
VirtualServiceIPv4 = net.ParseIP("169.254.0.253")
VirtualServiceIPv6 = net.ParseIP("fc01::aabb:ccdd:eeff")

// VirtualNodePortDNATIPv4 or VirtualNodePortDNATIPv6 is used in the following scenarios:
// - The IP is used to perform DNAT on host for packets of NodePort sourced from local Node or external network.
// - The IP is used as destination IP in host routing entry to forward DNATed NodePort packets to Antrea gateway
VirtualNodePortDNATIPv4 = net.ParseIP("169.254.0.252")
VirtualNodePortDNATIPv6 = net.ParseIP("fc01::aabb:ccdd:eefe")
)

type GatewayConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/openflow/pipeline.go
Expand Up @@ -2280,13 +2280,13 @@ func (f *featureService) nodePortMarkFlows() []binding.Flow {
Action().LoadRegMark(ToNodePortAddressRegMark).
Done())
}
// This generates the flow for the virtual IP. The flow is used to mark the first packet of NodePort connection from
// the Antrea gateway (the connection is performed DNAT with the virtual IP in host netns).
// This generates the flow for the virtual NodePort DNAT IP. The flow is used to mark the first packet of NodePort
// connection sourced from the Antrea gateway (the connection is performed DNAT with the virtual IP in host netns).
flows = append(flows,
NodePortMarkTable.ofTable.BuildFlow(priorityNormal).
Cookie(cookieID).
MatchProtocol(ipProtocol).
MatchDstIP(f.virtualIPs[ipProtocol]).
MatchDstIP(f.virtualNodePortDNATIPs[ipProtocol]).
Action().LoadRegMark(ToNodePortAddressRegMark).
Done())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/pipeline_test.go
Expand Up @@ -245,7 +245,7 @@ func TestBuildPipeline(t *testing.T) {
EgressMetricTable,
L3ForwardingTable,
L3DecTTLTable,
ServiceMarkTable,
SNATMarkTable,
SNATTable,
L2ForwardingCalcTable,
AntreaPolicyIngressRuleTable,
Expand Down
57 changes: 31 additions & 26 deletions pkg/agent/openflow/service.go
Expand Up @@ -33,14 +33,15 @@ type featureService struct {
cachedFlows *flowCategoryCache
groupCache sync.Map

gatewayIPs map[binding.Protocol]net.IP
virtualIPs map[binding.Protocol]net.IP
dnatCtZones map[binding.Protocol]int
snatCtZones map[binding.Protocol]int
gatewayMAC net.HardwareAddr
nodePortAddresses map[binding.Protocol][]net.IP
serviceCIDRs map[binding.Protocol]net.IPNet
networkConfig *config.NetworkConfig
gatewayIPs map[binding.Protocol]net.IP
virtualIPs map[binding.Protocol]net.IP
virtualNodePortDNATIPs map[binding.Protocol]net.IP
dnatCtZones map[binding.Protocol]int
snatCtZones map[binding.Protocol]int
gatewayMAC net.HardwareAddr
nodePortAddresses map[binding.Protocol][]net.IP
serviceCIDRs map[binding.Protocol]net.IPNet
networkConfig *config.NetworkConfig

enableProxy bool
proxyAll bool
Expand All @@ -66,6 +67,7 @@ func newFeatureService(
connectUplinkToBridge bool) *featureService {
gatewayIPs := make(map[binding.Protocol]net.IP)
virtualIPs := make(map[binding.Protocol]net.IP)
virtualNodePortDNATIPs := make(map[binding.Protocol]net.IP)
dnatCtZones := make(map[binding.Protocol]int)
snatCtZones := make(map[binding.Protocol]int)
nodePortAddresses := make(map[binding.Protocol][]net.IP)
Expand All @@ -74,6 +76,7 @@ func newFeatureService(
if ipProtocol == binding.ProtocolIP {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv4
virtualIPs[ipProtocol] = config.VirtualServiceIPv4
virtualNodePortDNATIPs[ipProtocol] = config.VirtualNodePortDNATIPv4
dnatCtZones[ipProtocol] = CtZone
snatCtZones[ipProtocol] = SNATCtZone
nodePortAddresses[ipProtocol] = serviceConfig.NodePortAddressesIPv4
Expand All @@ -83,6 +86,7 @@ func newFeatureService(
} else if ipProtocol == binding.ProtocolIPv6 {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv6
virtualIPs[ipProtocol] = config.VirtualServiceIPv6
virtualNodePortDNATIPs[ipProtocol] = config.VirtualNodePortDNATIPv6
dnatCtZones[ipProtocol] = CtZoneV6
snatCtZones[ipProtocol] = SNATCtZoneV6
nodePortAddresses[ipProtocol] = serviceConfig.NodePortAddressesIPv6
Expand All @@ -93,24 +97,25 @@ func newFeatureService(
}

return &featureService{
cookieAllocator: cookieAllocator,
ipProtocols: ipProtocols,
bridge: bridge,
cachedFlows: newFlowCategoryCache(),
groupCache: sync.Map{},
gatewayIPs: gatewayIPs,
virtualIPs: virtualIPs,
dnatCtZones: dnatCtZones,
snatCtZones: snatCtZones,
nodePortAddresses: nodePortAddresses,
serviceCIDRs: serviceCIDRs,
gatewayMAC: nodeConfig.GatewayConfig.MAC,
networkConfig: networkConfig,
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
ctZoneSrcField: getZoneSrcField(connectUplinkToBridge),
category: cookie.Service,
cookieAllocator: cookieAllocator,
ipProtocols: ipProtocols,
bridge: bridge,
cachedFlows: newFlowCategoryCache(),
groupCache: sync.Map{},
gatewayIPs: gatewayIPs,
virtualIPs: virtualIPs,
virtualNodePortDNATIPs: virtualNodePortDNATIPs,
dnatCtZones: dnatCtZones,
snatCtZones: snatCtZones,
nodePortAddresses: nodePortAddresses,
serviceCIDRs: serviceCIDRs,
gatewayMAC: nodeConfig.GatewayConfig.MAC,
networkConfig: networkConfig,
enableProxy: enableProxy,
proxyAll: proxyAll,
connectUplinkToBridge: connectUplinkToBridge,
ctZoneSrcField: getZoneSrcField(connectUplinkToBridge),
category: cookie.Service,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/proxy/proxier.go
Expand Up @@ -287,9 +287,9 @@ func smallSliceDifference(s1, s2 []string) []string {
}

func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
svcIP := agentconfig.VirtualServiceIPv4
svcIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
svcIP = agentconfig.VirtualServiceIPv6
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeNodePort); err != nil {
return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err)
Expand All @@ -301,9 +301,9 @@ func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort ui
}

func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Protocol) error {
svcIP := agentconfig.VirtualServiceIPv4
svcIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
svcIP = agentconfig.VirtualServiceIPv6
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove Service NodePort NodePort load balancing flows: %w", err)
Expand Down
54 changes: 49 additions & 5 deletions pkg/agent/route/route_linux.go
Expand Up @@ -440,7 +440,13 @@ func (c *Client) syncIPTables() error {
})
// Use iptables-restore to configure IPv4 settings.
if c.networkConfig.IPv4Enabled {
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv4CIDR, antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, antreaNodePortIPSet, config.VirtualServiceIPv4, snatMarkToIPv4)
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv4CIDR,
antreaPodIPSet,
localAntreaFlexibleIPAMPodIPSet,
antreaNodePortIPSet,
config.VirtualNodePortDNATIPv4,
config.VirtualServiceIPv4,
snatMarkToIPv4)
// Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables.
if err := c.ipt.Restore(iptablesData.Bytes(), false, false); err != nil {
return err
Expand All @@ -449,7 +455,13 @@ func (c *Client) syncIPTables() error {

// Use ip6tables-restore to configure IPv6 settings.
if c.networkConfig.IPv6Enabled {
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv6CIDR, antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, antreaNodePortIP6Set, config.VirtualServiceIPv6, snatMarkToIPv6)
iptablesData := c.restoreIptablesData(c.nodeConfig.PodIPv6CIDR,
antreaPodIP6Set,
localAntreaFlexibleIPAMPodIP6Set,
antreaNodePortIP6Set,
config.VirtualNodePortDNATIPv4,
config.VirtualServiceIPv6,
snatMarkToIPv6)
// Setting --noflush to keep the previous contents (i.e. non antrea managed chains) of the tables.
if err := c.ipt.Restore(iptablesData.Bytes(), false, true); err != nil {
return err
Expand All @@ -458,7 +470,13 @@ func (c *Client) syncIPTables() error {
return nil
}

func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, nodePortIPSet string, serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP) *bytes.Buffer {
func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
podIPSet,
localAntreaFlexibleIPAMPodIPSet,
nodePortIPSet string,
nodePortDNATVirtualIP,
serviceVirtualIP net.IP,
snatMarkToIP map[uint32]net.IP) *bytes.Buffer {
// Create required rules in the antrea chains.
// Use iptables-restore as it flushes the involved chains and creates the desired rules
// with a single call, instead of string matching to clean up stale rules.
Expand Down Expand Up @@ -571,15 +589,15 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFl
"-m", "comment", "--comment", `"Antrea: DNAT external to NodePort packets"`,
"-m", "set", "--match-set", nodePortIPSet, "dst,dst",
"-j", iptables.DNATTarget,
"--to-destination", serviceVirtualIP.String(),
"--to-destination", nodePortDNATVirtualIP.String(),
}...)
writeLine(iptablesData, iptables.MakeChainLine(antreaOutputChain))
writeLine(iptablesData, []string{
"-A", antreaOutputChain,
"-m", "comment", "--comment", `"Antrea: DNAT local to NodePort packets"`,
"-m", "set", "--match-set", nodePortIPSet, "dst,dst",
"-j", iptables.DNATTarget,
"--to-destination", serviceVirtualIP.String(),
"--to-destination", nodePortDNATVirtualIP.String(),
}...)
}
writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain))
Expand Down Expand Up @@ -694,11 +712,17 @@ func (c *Client) initServiceIPRoutes() error {
if err := c.addVirtualServiceIPRoute(false); err != nil {
return err
}
if err := c.addVirtualNodePortDNATIPRoute(false); err != nil {
return err
}
}
if c.networkConfig.IPv6Enabled {
if err := c.addVirtualServiceIPRoute(true); err != nil {
return err
}
if err := c.addVirtualNodePortDNATIPRoute(true); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -1252,6 +1276,26 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
return nil
}

func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
vIP := config.VirtualNodePortDNATIPv4
gw := config.VirtualServiceIPv4
mask := ipv4AddrLength
if isIPv6 {
vIP = config.VirtualNodePortDNATIPv6
gw = config.VirtualServiceIPv6
mask = ipv6AddrLength
}
route := generateRoute(vIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
if err := netlink.RouteReplace(route); err != nil {
return fmt.Errorf("failed to install routing entry for virtual NodePort DNAT IP %s: %w", vIP.String(), err)
}
klog.V(4).InfoS("Added virtual NodePort DNAT IP route", "route", route)
c.serviceRoutes.Store(vIP.String(), route)

return nil
}

// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea
// gateway on host.
func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/agent/route/route_windows.go
Expand Up @@ -42,9 +42,10 @@ const (
)

var (
antreaNat = util.AntreaNatName
virtualServiceIPv4Net = util.NewIPNet(config.VirtualServiceIPv4)
PodCIDRIPv4 *net.IPNet
antreaNat = util.AntreaNatName
virtualServiceIPv4Net = util.NewIPNet(config.VirtualServiceIPv4)
virtualNodePortDNATIPv4Net = util.NewIPNet(config.VirtualNodePortDNATIPv4)
PodCIDRIPv4 *net.IPNet
)

type Client struct {
Expand Down Expand Up @@ -245,8 +246,11 @@ func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error {
}
klog.InfoS("Added virtual Service IP neighbor", "neighbor", vNeighbor)

if err := c.addServiceRoute(config.VirtualNodePortDNATIPv4); err != nil {
return err
}
// For NodePort Service, a new NetNat for NetNatStaticMapping is needed.
err := util.NewNetNat(antreaNatNodePort, virtualServiceIPv4Net)
err := util.NewNetNat(antreaNatNodePort, virtualNodePortDNATIPv4Net)
if err != nil {
return err
}
Expand Down