Skip to content

Commit

Permalink
Extends the Endpoints support from 500 to 800, extra ones will be dro…
Browse files Browse the repository at this point in the history
…pped in AntreaProxy (antrea-io#2101)

For antrea-io#2092

Due to the message size and the implementation of Service in AntreaProxy,
the maximum number of Endpoints that AntreaProxy can support now is 800.
If the the number of Endpoints in given Service exceeds 800, the extra
Endpoints will be dropped and a warning will be logged.

In AntreaProxy, OVS group is the key part of Service implementation. For
now, Antrea is using Openflow 1.3 to communicate with OVS. In previous
design, every bucket of a OVS group has five actions. Two actions for loading
Endpoint IP and port to registers and resubmit action must be preserved.The
other two actions for loading values to register can be moved to flows (in
current patch, they are moved to table 41), and then one message can hold
more bucket items. As a result, the maximum Endpoint has changed from 511
to 800. Unfortunately, to ensure AntreaProxy running correctly, the extra
Endpoints will be dropped.
  • Loading branch information
hongliangl authored and antoninbas committed Apr 30, 2021
1 parent 0bcb59b commit aa63718
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 18 deletions.
6 changes: 5 additions & 1 deletion docs/feature-gates.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the
`AntreaProxy` implements Service load-balancing for ClusterIP Services as part
of the OVS pipeline, as opposed to relying on kube-proxy. This only applies to
traffic originating from Pods, and destined to ClusterIP Services. In
particular, it does not apply to NodePort Services.
particular, it does not apply to NodePort Services. Please note that due to
some restrictions on the implementation of Services in Antrea, the maximum
number of Endpoints that Antrea can support at the moment is 800. If the
number of Endpoints for a given Service exceeds 800, extra Endpoints will
be dropped.

Note that this feature must be enabled for Windows. The Antrea Windows YAML
manifest provided as part of releases enables this feature by default. If you
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
Expand Down Expand Up @@ -447,7 +448,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol))
flows = append(flows, c.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0))
if affinityTimeout != 0 {
flows = append(flows, c.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout))
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,12 +1769,21 @@ func (c *client) serviceLearnFlow(groupID binding.GroupIDType, svcIP net.IP, svc

// serviceLBFlow generates the flow which uses the specific group to do Endpoint
// selection.
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol) binding.Flow {
func (c *client) serviceLBFlow(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, withSessionAffinity bool) binding.Flow {
var lbResultMark uint32
if withSessionAffinity {
lbResultMark = marksRegServiceNeedLearn
} else {
lbResultMark = marksRegServiceSelected
}

return c.pipeline[serviceLBTable].BuildFlow(priorityNormal).
MatchProtocol(protocol).
MatchDstPort(svcPort, nil).
MatchDstIP(svcIP).
MatchRegRange(int(serviceLearnReg), marksRegServiceNeedLB, serviceLearnRegRange).
Action().LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
Action().Group(groupID).
Cookie(c.cookieAllocator.Request(cookie.Service).Raw()).
Done()
Expand Down Expand Up @@ -1840,13 +1849,10 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow {
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := c.bridge.CreateGroup(groupID).ResetBuckets()
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
resubmitTableID = serviceLBTable
lbResultMark = marksRegServiceNeedLearn
} else {
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}

for _, endpoint := range endpoints {
Expand All @@ -1859,17 +1865,13 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
group = group.Bucket().Weight(100).
LoadReg(int(endpointIPReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
LoadXXReg(int(endpointIPv6XXReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
ResubmitToTable(resubmitTableID).
Done()
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,16 @@ func (t *endpointsChangesTracker) Update(em types.EndpointsMap) {
}
}
}

// byEndpoint helps sort Endpoint
type byEndpoint []k8sproxy.Endpoint

func (p byEndpoint) Len() int {
return len(p)
}
func (p byEndpoint) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p byEndpoint) Less(i, j int) bool {
return p[i].String() < p[j].String()
}
48 changes: 44 additions & 4 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package proxy
import (
"fmt"
"net"
"sort"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
Expand All @@ -38,6 +40,10 @@ import (
const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"
// Due to the maximum message size in Openflow 1.3 and the implementation of Services in Antrea, the maximum number
// of Endpoints that Antrea can support at the moment is 800. If the number of Endpoints for a given Service exceeds
// 800, extra Endpoints will be dropped.
maxEndpoints = 800
)

// TODO: Add metrics
Expand Down Expand Up @@ -67,6 +73,8 @@ type proxier struct {
serviceStringMap map[string]k8sproxy.ServicePortName
// serviceStringMapMutex protects serviceStringMap object.
serviceStringMapMutex sync.Mutex
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

runner *k8sproxy.BoundedFrequencyRunner
stopChan <-chan struct{}
Expand Down Expand Up @@ -94,6 +102,9 @@ func (p *proxier) removeStaleServices() {
}
svcInfo := svcPort.(*types.ServiceInfo)
klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String())
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil {
klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err)
continue
Expand Down Expand Up @@ -235,12 +246,40 @@ func (p *proxier) installServices() {
}

var endpointUpdateList []k8sproxy.Endpoint
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
if len(endpoints) > maxEndpoints {
if !p.oversizeServiceSet.Has(svcPortName.String()) {
klog.Warningf("Since Endpoints of Service %s exceeds %d, extra Endpoints will be dropped", svcPortName.String(), maxEndpoints)
p.oversizeServiceSet.Insert(svcPortName.String())
}
// If the length of endpoints > maxEndpoints, endpoints should be cut. However, endpoints is a map. Therefore,
// iterate the map and append every Endpoint to a slice endpointList. Since the iteration order of map in
// Golang is random, if cut directly without any sorting, some Endpoints may not be installed. So cutting
// slice endpointList after sorting can avoid this situation in some degree.
var endpointList []k8sproxy.Endpoint
for _, endpoint := range endpoints {
endpointList = append(endpointList, endpoint)
}
sort.Sort(byEndpoint(endpointList))
endpointList = endpointList[:maxEndpoints]

for _, endpoint := range endpointList { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
} else {
if p.oversizeServiceSet.Has(svcPortName.String()) {
p.oversizeServiceSet.Delete(svcPortName.String())
}
for _, endpoint := range endpoints { // Check if there is any installed Endpoint which is not expected anymore.
if _, ok := endpointsInstalled[endpoint.String()]; !ok { // There is an expected Endpoint which is not installed.
needUpdateEndpoints = true
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}
endpointUpdateList = append(endpointUpdateList, endpoint)
}

if len(endpoints) < len(endpointsInstalled) { // There are Endpoints which expired.
klog.V(2).Infof("Some Endpoints of Service %s removed, updating Endpoints", svcInfo.String())
needUpdateEndpoints = true
Expand Down Expand Up @@ -468,6 +507,7 @@ func NewProxier(
endpointsMap: types.EndpointsMap{},
endpointReferenceCounter: map[string]int{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
oversizeServiceSet: sets.NewString(),
groupCounter: types.NewGroupCounter(),
ofClient: ofClient,
isIPv6: isIPv6,
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ func testProxyServiceLifeCycle(ipFamily *corev1.IPFamily, ingressIPs []string, d

var groupKeyword string
if *ipFamily == corev1.IPv6Protocol {
groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80)
groupKeyword = fmt.Sprintf("set_field:0x%s->xxreg3,load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv6.To16()), "0"), 80)
} else {
groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80)
groupKeyword = fmt.Sprintf("load:0x%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15]", strings.TrimLeft(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"), 80)
}
groupOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName})
require.NoError(t, err)
Expand Down
9 changes: 7 additions & 2 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,16 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
nw_proto = 132
learnProtoField = "OXM_OF_SCTP_DST[]"
}

serviceLearnReg := 2
if stickyAge != 0 {
serviceLearnReg = 3
}
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)
svcFlows := expectTableFlows{tableID: 41, flows: []*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
ActStr: fmt.Sprintf("group:%d", gid),
ActStr: fmt.Sprintf("load:0x%x->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],group:%d", serviceLearnReg, gid),
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
Expand All @@ -608,7 +613,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
for _, ep := range endpointList {
epIP := ipToHexString(net.ParseIP(ep.IP()))
epPort, _ := ep.Port()
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[19],resubmit(,42)", epIP, epPort)
bucket := fmt.Sprintf("weight:100,actions=load:%s->NXM_NX_REG3[],load:0x%x->NXM_NX_REG4[0..15],resubmit(,42)", epIP, epPort)
groupBuckets = append(groupBuckets, bucket)

unionVal := (0b010 << 16) + uint32(epPort)
Expand Down

0 comments on commit aa63718

Please sign in to comment.