Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Internal Traffic Policy in AntreaProxy #2792

Merged
merged 1 commit into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
169 changes: 112 additions & 57 deletions pkg/agent/proxy/proxier.go
Expand Up @@ -347,7 +347,6 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s
func (p *proxier) installServices() {
for svcPortName, svcPort := range p.serviceMap {
svcInfo := svcPort.(*types.ServiceInfo)
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, false)
endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName]
if !ok {
endpointsInstalled = map[string]k8sproxy.Endpoint{}
Expand All @@ -366,47 +365,88 @@ func (p *proxier) installServices() {
pSvcInfo = installedSvcPort.(*types.ServiceInfo)
needRemoval = serviceIdentityChanged(svcInfo, pSvcInfo) || (svcInfo.SessionAffinityType() != pSvcInfo.SessionAffinityType())
needUpdateService = needRemoval || (svcInfo.StickyMaxAgeSeconds() != pSvcInfo.StickyMaxAgeSeconds())
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() || pSvcInfo.NodeLocalExternal() != svcInfo.NodeLocalExternal()
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() ||
pSvcInfo.NodeLocalExternal() != svcInfo.NodeLocalExternal() ||
pSvcInfo.NodeLocalInternal() != svcInfo.NodeLocalInternal()
} else { // Need to install.
needUpdateService = true
}

var endpointUpdateList []k8sproxy.Endpoint
var internalNodeLocal, externalNodeLocal bool
if svcInfo.NodeLocalInternal() {
internalNodeLocal = true
}
if p.proxyAll && svcInfo.NodeLocalExternal() {
externalNodeLocal = true
}

var allEndpointUpdateList, localEndpointUpdateList []k8sproxy.Endpoint
if len(endpoints) > maxEndpoints {
if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, endpoints is a map. Therefore,
// iterate the map and append every Endpoint to a slice endpointList. Since the iteration order of map in
// Golang is random, if cut directly without any sorting, some Endpoints may not be installed. So cutting
// slice endpointList after sorting can avoid this situation in some degree.
var endpointList []k8sproxy.Endpoint
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, since endpoints is a map, iterate
// the map and append every Endpoint to a target slice. When the Endpoint is local, append Endpoint to slice
// localEndpointList, otherwise append the Endpoint to slice remoteEndpointList. Since the iteration order
// of map in Golang is not guaranteed, if split the Endpoints directly without any sorting, some Endpoints
// may not be installed, so split the endpointList after sorting.
var remoteEndpointList, localEndpointList []k8sproxy.Endpoint
for _, endpoint := range endpoints {
endpointList = append(endpointList, endpoint)
if endpoint.GetIsLocal() {
localEndpointList = append(localEndpointList, endpoint)
} else {
remoteEndpointList = append(remoteEndpointList, endpoint)
}
}
sort.Sort(byEndpoint(endpointList))
endpointList = endpointList[:maxEndpoints]

for _, endpoint := range endpointList { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
sort.Sort(byEndpoint(remoteEndpointList))
sort.Sort(byEndpoint(localEndpointList))

if len(localEndpointList) > maxEndpoints {
// When the number of local Endpoints is greater than maxEndpoints, choose maxEndpoints Endpoints from
// localEndpointList to install.
allEndpointUpdateList = localEndpointList[:maxEndpoints]
} else {
// When the number of local Endpoints is smaller than maxEndpoints, choose all Endpoints of localEndpointList
// and part of remoteEndpointList to install.
localEndpointUpdateList = localEndpointList
allEndpointUpdateList = append(localEndpointList, remoteEndpointList[:maxEndpoints-len(localEndpointList)]...)
}
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range allEndpointUpdateList {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
break
}
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
} else {
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
// Check if there is any installed Endpoint which is not expected anymore. If internalTrafficPolicy and externalTrafficPolicy
// are both Local, only local Endpoints should be installed and checked; if internalTrafficPolicy or externalTrafficPolicy
// is Cluster, all Endpoints should be installed and checked.
for _, endpoint := range endpoints {
if internalNodeLocal && externalNodeLocal && endpoint.GetIsLocal() || !internalNodeLocal || !externalNodeLocal {
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
}
allEndpointUpdateList = append(allEndpointUpdateList, endpoint)
if endpoint.GetIsLocal() {
localEndpointUpdateList = append(localEndpointUpdateList, endpoint)
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
}

if len(endpoints) < len(endpointsInstalled) { // There are Endpoints which expired.
// If there are expired Endpoints, Endpoints installed should be updated.
if internalNodeLocal && externalNodeLocal && len(localEndpointUpdateList) < len(endpointsInstalled) ||
!(internalNodeLocal && externalNodeLocal) && len(allEndpointUpdateList) < len(endpointsInstalled) {
klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String())
needUpdateEndpoints = true
}
Expand Down Expand Up @@ -437,45 +477,64 @@ func (p *proxier) installServices() {
}

if needUpdateEndpoints {
var endpointUpdateList []k8sproxy.Endpoint
// If the type of the Service is NodePort or LoadBalancer and both internalTrafficPolicy and externalTrafficPolicy
// are Local, or the type of the Service is ClusterIP and internalTrafficPolicy is Local, then only local
// Endpoints should be installed, otherwise all Endpoints should be installed.
if internalNodeLocal && (externalNodeLocal || svcInfo.NodePort() == 0) {
endpointUpdateList = localEndpointUpdateList
} else {
endpointUpdateList = allEndpointUpdateList
}
// Install Endpoints.
err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, endpointUpdateList)
if err != nil {
klog.ErrorS(err, "Error when installing Endpoints flows")
continue
}
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList)
if err != nil {
klog.ErrorS(err, "Error when installing Endpoints groups")
continue
}

if p.proxyAll {
if svcInfo.NodeLocalExternal() {
// Install another group when Service externalTrafficPolicy is Local.
groupIDLocal := p.groupCounter.AllocateIfNotExist(svcPortName, true)
var localEndpointList []k8sproxy.Endpoint
for _, ed := range endpointUpdateList {
if !ed.GetIsLocal() {
continue
}
localEndpointList = append(localEndpointList, ed)
if internalNodeLocal != externalNodeLocal {
if svcInfo.NodePort() > 0 {
// If the type of the Service is NodePort or LoadBalancer, when internalTrafficPolicy and externalTrafficPolicy
// of the Service are different, install two groups. One group has all Endpoints, the other has only
// local Endpoints.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, true)
if err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointUpdateList); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question - I know it is not introduced by you, but what is the strategy to handle errors? Is it right just to ignore them?

klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName)
continue
}
if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil {
klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local")
groupID = p.groupCounter.AllocateIfNotExist(svcPortName, false)
if err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, allEndpointUpdateList); err != nil {
klog.ErrorS(err, "Error when installing Group of all Endpoints for Service", "Service", svcPortName)
continue
}
} else {
// Uninstall the group with only local Endpoints when Service externalTrafficPolicy is Cluster
// unconditionally. If the group doesn't exist on OVS, then the return value will be nil; if the
// group exists on OVS, and after it is uninstalled successfully, then the return value will be also
// nil.
if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist {
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, true)
// If the type of the Service is ClusterIP, install a group according to internalTrafficPolicy.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalNodeLocal)
if err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList); err != nil {
klog.ErrorS(err, "Error when installing Group of Endpoints for Service", "Service", svcPortName)
continue
}
}
} else {
// Regardless of the type of the Service, when internalTrafficPolicy and externalTrafficPolicy of the Service
// are the same, only install one group and unconditionally uninstall another group. If both internalTrafficPolicy
// and externalTrafficPolicy are Local, install the group that has only local Endpoints and unconditionally
// uninstall the group which has all Endpoints; if both internalTrafficPolicy and externalTrafficPolicy are
// Cluster, install the group which has all Endpoints and unconditionally uninstall the group which has
// only local Endpoints. Note that, if a group doesn't exist on OVS, then the return value will be nil.
nodeLocalVal := internalNodeLocal && externalNodeLocal
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalVal)
if err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList); err != nil {
klog.ErrorS(err, "Error when installing Group of local Endpoints for Service", "Service", svcPortName)
continue
}
if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, !nodeLocalVal)
}
}

for _, e := range endpointUpdateList {
Expand Down Expand Up @@ -515,20 +574,15 @@ func (p *proxier) installServices() {
}
}

// Install ClusterIP flows of current Service.
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), false, corev1.ServiceTypeClusterIP); err != nil {
// Install ClusterIP flows for the Service.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalNodeLocal)
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), svcInfo.NodeLocalInternal(), corev1.ServiceTypeClusterIP); err != nil {
klog.Errorf("Error when installing Service flows: %v", err)
continue
}

// If externalTrafficPolicy of the Service is Local, Service NodePort or LoadBalancer should use the Service
// group whose Endpoints are local.
nGroupID := groupID
if svcInfo.NodeLocalExternal() {
nGroupID = p.groupCounter.AllocateIfNotExist(svcPortName, true)
}

if p.proxyAll {
nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalNodeLocal)
// Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP
// is created, the routing target IP block will be recalculated for expansion to be able to route the new
// created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR
Expand All @@ -549,6 +603,7 @@ func (p *proxier) installServices() {
}

if p.proxyLoadBalancerIPs {
nGroupID := p.groupCounter.AllocateIfNotExist(svcPortName, externalNodeLocal)
// Service LoadBalancer flows can be partially updated.
var toDelete, toAdd []string
if needRemoval {
Expand Down