Skip to content

Commit

Permalink
Support Internal Traffic Policy in AntreaProxy
Browse files Browse the repository at this point in the history
InternalTrafficPolicy is introduced in Kubernetes 1.21. Service Internal
Traffic Policy enables internal traffic restrictions to only route
internal traffic to Endpoints within the Node the traffic originated
from. The "internal" traffic here refers to traffic originated from Pod
in the current cluster. This can help to reduce costs and improve
performance.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Oct 19, 2021
1 parent fa111a9 commit f6b9cac
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 108 deletions.
169 changes: 117 additions & 52 deletions pkg/agent/proxy/proxier.go
Expand Up @@ -165,21 +165,33 @@ func (p *proxier) removeStaleServices() {
continue
}
}
// Remove Service group whose Endpoints are local.
if svcInfo.NodeLocalExternal() {
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, true)

var internalNodeLocal, externalNodeLocal bool
if svcInfo.NodeLocalInternal() {
internalNodeLocal = true
}
// Remove Service group which has all Endpoints.
groupID, _ := p.groupCounter.Get(svcPortName, false)
if p.proxyAll && svcInfo.NodeLocalExternal() {
externalNodeLocal = true
}
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}

// If internalTrafficPolicy of the Service is Local, a Service group that only has local Endpoints should have been
// already created. If internalTrafficPolicy of the theService is Cluster, a Service group that has all Endpoints
// should have been created. Remove the Service group.
groupID, _ := p.groupCounter.Get(svcPortName, internalNodeLocal)
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
continue
}
if svcInfo.NodePort() != 0 && externalNodeLocal != internalNodeLocal {
groupID, _ = p.groupCounter.Get(svcPortName, externalNodeLocal)
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
continue
}
}

delete(p.serviceInstalledMap, svcPortName)
p.deleteServiceByIP(svcInfo.String())
Expand Down Expand Up @@ -255,7 +267,8 @@ func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool {
svcInfo.Port() != pSvcInfo.Port() ||
svcInfo.OFProtocol != pSvcInfo.OFProtocol ||
svcInfo.NodePort() != pSvcInfo.NodePort() ||
svcInfo.NodeLocalExternal() != pSvcInfo.NodeLocalExternal()
svcInfo.NodeLocalExternal() != pSvcInfo.NodeLocalExternal() ||
svcInfo.NodeLocalInternal() != pSvcInfo.NodeLocalInternal()
}

// smallSliceDifference builds a slice which includes all the strings from s1
Expand Down Expand Up @@ -350,7 +363,6 @@ func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, s
func (p *proxier) installServices() {
for svcPortName, svcPort := range p.serviceMap {
svcInfo := svcPort.(*types.ServiceInfo)
groupID, _ := p.groupCounter.Get(svcPortName, false)
endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName]
if !ok {
endpointsInstalled = map[string]k8sproxy.Endpoint{}
Expand All @@ -374,38 +386,86 @@ func (p *proxier) installServices() {
needUpdateService = true
}

// Get local Endpoints and remote Endpoints from map endpoints.
var localEndpointList, remoteEndpointList []k8sproxy.Endpoint
var needUpdateLocalEndpoints, needUpdateRemoteEndpoints bool
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
_, installed := endpointsInstalled[endpoint.String()]
if endpoint.GetIsLocal() {
localEndpointList = append(localEndpointList, endpoint)
if !installed {
needUpdateLocalEndpoints = true
}
} else {
remoteEndpointList = append(remoteEndpointList, endpoint)
if !installed {
needUpdateRemoteEndpoints = true
}
}
}

// Get internalTrafficPolicy and externalTrafficPolicy of the Service.
var internalNodeLocal, externalNodeLocal bool
if svcInfo.NodeLocalInternal() {
internalNodeLocal = true
}
if p.proxyAll && svcInfo.NodeLocalExternal() {
externalNodeLocal = true
}

// Get all Endpoints that need to be installed.
var endpointUpdateList []k8sproxy.Endpoint
if len(endpoints) > maxEndpoints {
//TODO: if lib!openflow supports OFPGC_INSERT_BUCKET and more than maxEndpoints Endpoints can be installed,
// this part should be deleted.

// If the number of local and remote Endpoints is more than maxEndpoints, drop some Endpoints.
if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, endpoints is a map. Therefore,
// iterate the map and append every Endpoint to a slice endpointList. Since the iteration order of map in
// Golang is random, if cut directly without any sorting, some Endpoints may not be installed. So cutting
// slice endpointList after sorting can avoid this situation in some degree.
var endpointList []k8sproxy.Endpoint
for _, endpoint := range endpoints {
endpointList = append(endpointList, endpoint)
}
sort.Sort(byEndpoint(endpointList))
endpointList = endpointList[:maxEndpoints]

for _, endpoint := range endpointList { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
if len(localEndpointList) > maxEndpoints {
// If the number of local Endpoints is more than maxEndpoints, then drop all remote Endpoints and keep
// maxEndpoints number of local Endpoints.
// Note that, this may lead to create duplicated group when internalTrafficPolicy and externalTrafficPolicy
// are not the same.
sort.Sort(byEndpoint(localEndpointList))
localEndpointList = localEndpointList[:maxEndpoints]
endpointUpdateList = localEndpointList
needUpdateEndpoints = needUpdateLocalEndpoints
} else if len(localEndpointList) <= maxEndpoints {
// When the number of local Endpoints is between 0 and maxEndpoints, keep all local Endpoints.
endpointUpdateList = localEndpointList
needUpdateEndpoints = needUpdateLocalEndpoints
if !internalNodeLocal || !(svcInfo.Port() > 0 && externalNodeLocal) {
// If at least one of internalTrafficPolicy or externalTrafficPolicy is Cluster, remote Endpoints are also
// needed, then keep some remote Endpoints to make the number of all Endpoints which is to be installed
// reaches maxEndpoints. Since slice remoteEndpointList is cut, append every kept Endpoint to slice
// endpointUpdateList and verify that if the Endpoint is installed.
sort.Sort(byEndpoint(remoteEndpointList))
remoteEndpointList = remoteEndpointList[:maxEndpoints-len(localEndpointList)]
for _, endpoint := range remoteEndpointList {
if _, installed := endpointsInstalled[endpoint.String()]; !installed {
needUpdateRemoteEndpoints = true
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
needUpdateEndpoints = needUpdateEndpoints || needUpdateRemoteEndpoints
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
} else {
// If the number of local and remote Endpoints is less than or equal to maxEndpoints, don't drop any Endpoints.
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
endpointUpdateList = append(endpointUpdateList, endpoint)
endpointUpdateList = localEndpointList
needUpdateEndpoints = needUpdateLocalEndpoints
// If at least one of internalTrafficPolicy or externalTrafficPolicy is Cluster, remote Endpoints are all
// needed, then append all remote Endpoints to slice endpointUpdateList.
if !internalNodeLocal || !(svcInfo.Port() > 0 && externalNodeLocal) {
endpointUpdateList = append(endpointUpdateList, remoteEndpointList...)
needUpdateEndpoints = needUpdateEndpoints || needUpdateRemoteEndpoints
}
}

Expand Down Expand Up @@ -443,24 +503,27 @@ func (p *proxier) installServices() {
klog.ErrorS(err, "Error when installing Endpoints flows")
continue
}
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList)
groupID, _ := p.groupCounter.Get(svcPortName, internalNodeLocal)
if internalNodeLocal {
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList)
} else {
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList)
}
if err != nil {
klog.ErrorS(err, "Error when installing Endpoints groups")
continue
}

// Install another group when Service externalTrafficPolicy is Local.
if p.proxyAll && svcInfo.NodeLocalExternal() {
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
var localEndpointList []k8sproxy.Endpoint
for _, ed := range endpointUpdateList {
if !ed.GetIsLocal() {
continue
}
localEndpointList = append(localEndpointList, ed)
// For NodePort or LoadBalancer Service, if externalTrafficPolicy and internalTrafficPolicy of the Service are
// not the same, another group should be created.
if svcInfo.NodePort() != 0 && externalNodeLocal != internalNodeLocal {
groupID, _ = p.groupCounter.Get(svcPortName, externalNodeLocal)
if externalNodeLocal {
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList)
} else {
err = p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpointUpdateList)
}
if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil {
klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local")
if err != nil {
klog.ErrorS(err, "Error when installing Endpoints groups")
continue
}
}
Expand Down Expand Up @@ -503,17 +566,19 @@ func (p *proxier) installServices() {
}

// Install ClusterIP flows of current Service.
groupID, _ := p.groupCounter.Get(svcPortName, internalNodeLocal)
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), false, corev1.ServiceTypeClusterIP); err != nil {
klog.Errorf("Error when installing Service flows: %v", err)
continue
}

// If externalTrafficPolicy of the Service is Local, Service NodePort or LoadBalancer should use the Service
// group whose Endpoints are local.
nGroupID := groupID
if svcInfo.NodeLocalExternal() {
nGroupID, _ = p.groupCounter.Get(svcPortName, true)
}
// When creating a Service of NodePort/LoadBalancer, a ClusterIP will be also created. If externalTrafficPolicy
// of the Service is Local and internalTrafficPolicy of the Service is Cluster, Service NodePort/LoadBalancer
// should use the Service group that which has only local Endpoints, rather than the Service group that has all
// Endpoints, which is for the ClusterIP. However, if internalTrafficPolicy of the Service is Local, the Service
// group that has only local Endpoints should have been created for the ClusterIP, Service NodePort/LoadBalancer
// should also use the same Service group.
groupID, _ = p.groupCounter.Get(svcPortName, externalNodeLocal)

if p.proxyAll {
// Install ClusterIP route on Node so that ClusterIP can be accessed on Node. Every time a new ClusterIP
Expand All @@ -528,7 +593,7 @@ func (p *proxier) installServices() {
// If previous Service is nil or NodePort flows and configurations of previous Service have been removed,
// install NodePort flows and configurations for current Service.
if svcInfo.NodePort() > 0 && (pSvcInfo == nil || needRemoval) {
if err := p.installNodePortService(nGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), svcInfo.NodeLocalExternal()); err != nil {
if err := p.installNodePortService(groupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), svcInfo.NodeLocalExternal()); err != nil {
klog.ErrorS(err, "Failed to install NodePort flows and configurations of Service", "Service", svcPortName)
continue
}
Expand All @@ -553,7 +618,7 @@ func (p *proxier) installServices() {
}
// Install LoadBalancer flows and configurations.
if len(toAdd) > 0 {
if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), svcInfo.NodeLocalExternal()); err != nil {
if err := p.installLoadBalancerService(groupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), svcInfo.NodeLocalExternal()); err != nil {
klog.ErrorS(err, "Failed to install LoadBalancer flows and configurations of Service", "Service", svcPortName)
continue
}
Expand Down

0 comments on commit f6b9cac

Please sign in to comment.