Skip to content

Commit

Permalink
Fix AntreaProxy not deleting stale UDP conntrack entries for the virt…
Browse files Browse the repository at this point in the history
…ual NodePort DNAT IP (#6379)

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Jun 6, 2024
1 parent 84db074 commit 2e48977
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 9 deletions.
42 changes: 37 additions & 5 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func (p *proxier) removeStaleServiceConntrackEntries(svcPortName k8sproxy.Servic
svcPort := uint16(svcInfo.Port())
nodePort := uint16(svcInfo.NodePort())
svcProto := svcInfo.OFProtocol
virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6
}

svcIPToPort := make(map[string]uint16)
svcIPToPort[svcInfo.ClusterIP().String()] = svcPort
Expand All @@ -354,6 +358,7 @@ func (p *proxier) removeStaleServiceConntrackEntries(svcPortName k8sproxy.Servic
for _, nodeIP := range p.nodePortAddresses {
svcIPToPort[nodeIP.String()] = nodePort
}
svcIPToPort[virtualNodePortDNATIP.String()] = nodePort
}

for svcIPStr, port := range svcIPToPort {
Expand All @@ -372,17 +377,23 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
svcPort := uint16(svcInfo.Port())
pNodePort := uint16(pSvcInfo.NodePort())
nodePort := uint16(svcInfo.NodePort())
pClusterIP := pSvcInfo.ClusterIP().String()
clusterIP := svcInfo.ClusterIP().String()
pExternalIPStrings := pSvcInfo.ExternalIPStrings()
externalIPStrings := svcInfo.ExternalIPStrings()
pLoadBalancerIPStrings := pSvcInfo.LoadBalancerIPStrings()
loadBalancerIPStrings := svcInfo.LoadBalancerIPStrings()
virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6
}
var svcPortChanged, svcNodePortChanged bool

staleSvcIPToPort := make(map[string]uint16)
// If the port of the Service is changed, delete all conntrack entries related to the previous Service IPs and the
// previous Service port. These previous Service IPs includes external IPs, loadBalancer IPs and the ClusterIP.
if pSvcPort != svcPort {
staleSvcIPToPort[pSvcInfo.ClusterIP().String()] = pSvcPort
staleSvcIPToPort[pClusterIP] = pSvcPort
for _, ip := range pExternalIPStrings {
staleSvcIPToPort[ip] = pSvcPort
}
Expand All @@ -394,7 +405,10 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
svcPortChanged = true
} else {
// If the port of the Service is not changed, delete the conntrack entries related to the stale Service IPs and
// the Service port. These stale Service IPs could be from external IPs and loadBalancer IPs.
// the Service port. These stale Service IPs could be clusterIP, externalIPs or loadBalancerIPs.
if pClusterIP != clusterIP {
staleSvcIPToPort[pClusterIP] = pSvcPort
}
deletedExternalIPs := smallSliceDifference(pExternalIPStrings, externalIPStrings)
deletedLoadBalancerIPs := smallSliceDifference(pLoadBalancerIPStrings, loadBalancerIPStrings)
for _, ip := range deletedExternalIPs {
Expand All @@ -404,11 +418,13 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
staleSvcIPToPort[ip] = pSvcPort
}
}
// If the NodePort of the Service is changed, delete the contrack entries related to the Node IPs and the Service nodePort.
// If the NodePort of the Service is changed, delete the conntrack entries related to each of the Node IPs / the
// virtual IP to which NodePort traffic from external will be DNATed and the Service nodePort.
if pNodePort != nodePort {
for _, nodeIP := range p.nodePortAddresses {
staleSvcIPToPort[nodeIP.String()] = pNodePort
}
staleSvcIPToPort[virtualNodePortDNATIP.String()] = pNodePort
svcNodePortChanged = true
}
// Delete the conntrack entries due to the change of the Service.
Expand All @@ -423,7 +439,7 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
remainingSvcIPToPort := make(map[string]uint16)
if !svcPortChanged {
// Get all remaining Service IPs.
remainingSvcIPToPort[svcInfo.ClusterIP().String()] = svcPort
remainingSvcIPToPort[clusterIP] = svcPort
for _, ip := range smallSliceSame(pExternalIPStrings, externalIPStrings) {
remainingSvcIPToPort[ip] = svcPort
}
Expand Down Expand Up @@ -744,6 +760,21 @@ func (p *proxier) installServices() {
pSvcInfo.ExternalPolicyLocal() != svcInfo.ExternalPolicyLocal() ||
pSvcInfo.InternalPolicyLocal() != svcInfo.InternalPolicyLocal()
if p.cleanupStaleUDPSvcConntrack && needClearConntrackEntries(pSvcInfo.OFProtocol) {
// We clean the UDP conntrack entries for the following Service update cases:
// - Service port changed, clean the conntrack entries matched by each of the current clusterIP / externalIPs
// / loadBalancerIPs and the stale Service port.
// - ClusterIP changed, clean the conntrack entries matched by the clusterIP and the Service port.
// - Some externalIPs / loadBalancerIPs are removed, clean the conntrack entries matched by each of the
// removed Service IPs and the current Service port.
// - Service nodePort changed, clean the conntrack entries matched by each of the Node IPs / the virtual
// NodePort DNAT IP and the stale Service nodePort.
// However, we DO NOT clean the UDP conntrack entries related to remote Endpoints that are still
// referenced by the Service but are no longer selectable Endpoints for the corresponding Service IPs
// (for externalTrafficPolicy, these IPs are loadBalancerIPs, externalIPs and NodeIPs; for
// internalTrafficPolicy, these IPs clusterIPs) when externalTrafficPolicy or internalTrafficPolicy is
// changed from Cluster to Local. Consequently, the connections, which are supposed to select local
// Endpoints, will continue to send packets to remote Endpoints due to the existing UDP conntrack entries
// until timeout.
needCleanupStaleUDPServiceConntrack = svcInfo.Port() != pSvcInfo.Port() ||
svcInfo.ClusterIP().String() != pSvcInfo.ClusterIP().String() ||
needUpdateServiceExternalAddresses
Expand All @@ -761,7 +792,8 @@ func (p *proxier) installServices() {
if len(staleEndpoints) > 0 || len(newEndpoints) > 0 {
needUpdateEndpoints = true
}
// If there are stale Endpoints for a UDP Service, conntrack connections of these stale Endpoints should be deleted.
// We also clean the conntrack entries related to the stale Endpoints for a UDP Service. Conntrack entries
// matched by each of stale Endpoint IPs and each of the remaining Service IPs and ports will be deleted.
if len(staleEndpoints) > 0 && needClearConntrackEntries(svcInfo.OFProtocol) {
needCleanupStaleUDPServiceConntrack = true
}
Expand Down
39 changes: 35 additions & 4 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ func TestDualStackService(t *testing.T) {
fpv6 := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, true)

svc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
svc.Spec.IPFamilyPolicy = ptr.To(corev1.IPFamilyPolicyPreferDualStack)
svc.Spec.ClusterIP = svc1IPv4.String()
svc.Spec.ClusterIPs = []string{svc1IPv4.String(), svc1IPv6.String()}
svc.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol, corev1.IPv6Protocol}
Expand Down Expand Up @@ -1364,8 +1365,9 @@ func TestDualStackService(t *testing.T) {
fpv6.OnEndpointSliceUpdate(nil, epv6)
fpv6.OnEndpointsSynced()

mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv4.String(), "", "", svcPort, false, true, true, false, nil)}).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1)
expectedIPv4Eps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv4.String(), "", "", svcPort, false, true, true, false, nil)}
mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, expectedIPv4Eps).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, expectedIPv4Eps).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{
ServiceIP: svc1IPv4,
ServicePort: uint16(svcPort),
Expand All @@ -1375,8 +1377,9 @@ func TestDualStackService(t *testing.T) {
ClusterGroupID: 1,
}).Times(1)

mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv6.String(), "", "", svcPort, false, true, true, false, nil)}).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, gomock.Any()).Times(1)
expectedIPv6Eps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep1IPv6.String(), "", "", svcPort, false, true, true, false, nil)}
mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, expectedIPv6Eps).Times(1)
mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, expectedIPv6Eps).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{
ServiceIP: svc1IPv6,
ServicePort: uint16(svcPort),
Expand All @@ -1390,6 +1393,32 @@ func TestDualStackService(t *testing.T) {
fpv6.syncProxyRules()
assert.Contains(t, fpv4.serviceInstalledMap, svcPortName)
assert.Contains(t, fpv6.serviceInstalledMap, svcPortName)

updatedSvc := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
svc.Spec.IPFamilyPolicy = ptr.To(corev1.IPFamilyPolicySingleStack)
svc.Spec.ClusterIP = svc1IPv4.String()
svc.Spec.ClusterIPs = []string{svc1IPv4.String()}
svc.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol}
svc.Spec.Ports = []corev1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: corev1.ProtocolTCP,
}}
})
fpv4.OnServiceUpdate(svc, updatedSvc)
fpv4.OnServiceSynced()
fpv6.OnServiceUpdate(svc, updatedSvc)
fpv6.OnServiceSynced()

mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv6, uint16(svcPort), binding.ProtocolTCPv6).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(binding.GroupIDType(2)).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCPv6, expectedIPv6Eps).Times(1)

fpv4.syncProxyRules()
fpv6.syncProxyRules()

assert.Contains(t, fpv4.serviceInstalledMap, svcPortName)
assert.NotContains(t, fpv6.serviceInstalledMap, svcPortName)
}

func getAPIProtocol(bindingProtocol binding.Protocol) corev1.Protocol {
Expand Down Expand Up @@ -1626,6 +1655,7 @@ func testNodePortRemove(t *testing.T, bindingProtocol binding.Protocol, isIPv6 b
if needClearConntrackEntries(bindingProtocol) {
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(vIP, uint16(svcNodePort), nil, bindingProtocol)
if externalIP != nil {
mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol)
}
Expand Down Expand Up @@ -1764,6 +1794,7 @@ func testLoadBalancerRemove(t *testing.T, bindingProtocol binding.Protocol, isIP
if needClearConntrackEntries(bindingProtocol) {
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(svcNodePortIP, uint16(svcNodePort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(vIP, uint16(svcNodePort), nil, bindingProtocol)
mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol)
if externalIP != nil {
mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol)
Expand Down

0 comments on commit 2e48977

Please sign in to comment.