From 67f3fd934b8a8b935440227a5c8ba7923ba91a2a Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Sat, 10 Jul 2021 22:31:29 +0800 Subject: [PATCH] chore: endpointslice controller (#574) --- README.md | 2 +- cmd/ingress/ingress.go | 1 + conf/config-default.yaml | 1 + docs/en/latest/FAQ.md | 2 +- pkg/ingress/controller.go | 103 ++++++++- pkg/ingress/endpoint.go | 84 +------ pkg/ingress/endpointslice.go | 210 ++++++++++++++++++ pkg/kube/endpoint.go | 9 +- pkg/kube/translation/translator_test.go | 133 ++++++++++- .../deploy/rbac/apisix_view_clusterrole.yaml | 8 + test/e2e/scaffold/ingress.go | 9 + 11 files changed, 470 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index 22b06dbd28..df46172b01 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ This project is currently general availability. ## Prerequisites -Apisix ingress controller requires Kubernetes version 1.14+. +Apisix ingress controller requires Kubernetes version 1.15+. ## Apache APISIX Ingress vs. Kubernetes Ingress Nginx diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go index df931ef8ce..de22508c8a 100644 --- a/cmd/ingress/ingress.go +++ b/cmd/ingress/ingress.go @@ -147,6 +147,7 @@ the apisix cluster and others are created`, cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader") cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"") cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"") + cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, "watch-endpointslices", false, "whether to watch endpointslices rather than endpoints") cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)") cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)") cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster") diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 78c4f5d373..b6ec625eb1 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -50,6 +50,7 @@ kubernetes: ingress_version: "networking/v1" # the supported ingress api group version, can be "networking/v1beta1" # , "networking/v1" (for Kubernetes version v1.19.0 or higher), and # "extensions/v1beta1", default is "networking/v1". + watch_endpointslices: false # whether to watch EndpointSlices rather than Endpoints. apisix_route_version: "apisix.apache.org/v2alpha1" # the supported apisixroute api group version, can be # "apisix.apache.org/v1" or "apisix.apache.org/v2alpha1", diff --git a/docs/en/latest/FAQ.md b/docs/en/latest/FAQ.md index 7c92f9a391..8786def8e6 100644 --- a/docs/en/latest/FAQ.md +++ b/docs/en/latest/FAQ.md @@ -49,7 +49,7 @@ Tips: The failure caused by empty upstream nodes is a limitation of Apache APISI 6. What is the retry rule of `apisix-ingress-controller`? -If an error occurs during the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered. +If an error occurs duriREADME.mdng the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered. The delayed retry method is adopted. After the first failure, it is retried once per second. After 5 retries are triggered, the slow retry strategy will be enabled, and the retry will be performed every 1 minute until it succeeds. diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index 7694f0b017..3c3d56604c 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -21,8 +21,11 @@ import ( "sync" "time" + apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" + configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -107,10 +110,11 @@ type Controller struct { apisixConsumerLister listersv2alpha1.ApisixConsumerLister // resource controllers - podController *podController - endpointsController *endpointsController - ingressController *ingressController - secretController *secretController + podController *podController + endpointsController *endpointsController + endpointSliceController *endpointSliceController + ingressController *ingressController + secretController *secretController apisixUpstreamController *apisixUpstreamController apisixRouteController *apisixRouteController @@ -237,8 +241,12 @@ func (c *Controller) initWhenStartLeading() { c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer() c.apisixConsumerInformer = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer() + if c.cfg.Kubernetes.WatchEndpointSlices { + c.endpointSliceController = c.newEndpointSliceController() + } else { + c.endpointsController = c.newEndpointsController() + } c.podController = c.newPodController() - c.endpointsController = c.newEndpointsController() c.apisixUpstreamController = c.newApisixUpstreamController() c.ingressController = c.newIngressController() c.apisixRouteController = c.newApisixRouteController() @@ -429,7 +437,11 @@ func (c *Controller) run(ctx context.Context) { c.podController.run(ctx) }) c.goAttach(func() { - c.endpointsController.run(ctx) + if c.cfg.Kubernetes.WatchEndpointSlices { + c.endpointSliceController.run(ctx) + } else { + c.endpointsController.run(ctx) + } }) c.goAttach(func() { c.apisixUpstreamController.run(ctx) @@ -508,6 +520,85 @@ func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consum } return } + +func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error { + namespace := ep.Namespace() + svcName := ep.ServiceName() + svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName) + if err != nil { + if k8serrors.IsNotFound(err) { + log.Infof("service %s/%s not found", ep.Namespace(), svcName) + return nil + } + log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err) + return err + } + var subsets []configv1.ApisixUpstreamSubset + subsets = append(subsets, configv1.ApisixUpstreamSubset{}) + au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName) + if err != nil { + if !k8serrors.IsNotFound(err) { + log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err) + return err + } + } else if len(au.Spec.Subsets) > 0 { + subsets = append(subsets, au.Spec.Subsets...) + } + + clusters := c.apisix.ListClusters() + for _, port := range svc.Spec.Ports { + for _, subset := range subsets { + nodes, err := c.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels) + if err != nil { + log.Errorw("failed to translate upstream nodes", + zap.Error(err), + zap.Any("endpoints", ep), + zap.Int32("port", port.Port), + ) + } + name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port) + for _, cluster := range clusters { + if err := c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil { + return err + } + } + } + } + return nil +} + +func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error { + upstream, err := cluster.Upstream().Get(ctx, upsName) + if err != nil { + if err == apisixcache.ErrNotFound { + log.Warnw("upstream is not referenced", + zap.String("cluster", cluster.String()), + zap.String("upstream", upsName), + ) + return nil + } else { + log.Errorw("failed to get upstream", + zap.String("upstream", upsName), + zap.String("cluster", cluster.String()), + zap.Error(err), + ) + return err + } + } + + upstream.Nodes = nodes + + log.Debugw("upstream binds new nodes", + zap.Any("upstream", upstream), + zap.String("cluster", cluster.String()), + ) + + updated := &manifest{ + upstreams: []*apisixv1.Upstream{upstream}, + } + return c.syncManifests(ctx, nil, updated, nil) +} + func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) { defer cancelFunc() for { diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go index fced1dd51d..c2eb236b8c 100644 --- a/pkg/ingress/endpoint.go +++ b/pkg/ingress/endpoint.go @@ -16,21 +16,16 @@ package ingress import ( "context" - "github.com/apache/apisix-ingress-controller/pkg/kube" "time" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "github.com/apache/apisix-ingress-controller/pkg/apisix" - apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" - configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" + "github.com/apache/apisix-ingress-controller/pkg/kube" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" - apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) type endpointsController struct { @@ -89,82 +84,7 @@ func (c *endpointsController) run(ctx context.Context) { func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error { ep := ev.Object.(kube.Endpoint) - namespace := ep.Namespace() - svcName := ep.ServiceName() - svc, err := c.controller.svcLister.Services(ep.Namespace()).Get(svcName) - if err != nil { - if k8serrors.IsNotFound(err) { - log.Infof("service %s/%s not found", ep.Namespace(), svcName) - return nil - } - log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err) - return err - } - var subsets []configv1.ApisixUpstreamSubset - subsets = append(subsets, configv1.ApisixUpstreamSubset{}) - au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName) - if err != nil { - if !k8serrors.IsNotFound(err) { - log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err) - return err - } - } else if len(au.Spec.Subsets) > 0 { - subsets = append(subsets, au.Spec.Subsets...) - } - - clusters := c.controller.apisix.ListClusters() - for _, port := range svc.Spec.Ports { - for _, subset := range subsets { - nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels) - if err != nil { - log.Errorw("failed to translate upstream nodes", - zap.Error(err), - zap.Any("endpoints", ep), - zap.Int32("port", port.Port), - ) - } - name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port) - for _, cluster := range clusters { - if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil { - return err - } - } - } - } - - return nil -} - -func (c *endpointsController) syncToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error { - upstream, err := cluster.Upstream().Get(ctx, upsName) - if err != nil { - if err == apisixcache.ErrNotFound { - log.Warnw("upstream is not referenced", - zap.String("cluster", cluster.String()), - zap.String("upstream", upsName), - ) - return nil - } else { - log.Errorw("failed to get upstream", - zap.String("upstream", upsName), - zap.String("cluster", cluster.String()), - zap.Error(err), - ) - return err - } - } - - upstream.Nodes = nodes - - log.Debugw("upstream binds new nodes", - zap.Any("upstream", upstream), - zap.String("cluster", cluster.String()), - ) - - updated := &manifest{ - upstreams: []*apisixv1.Upstream{upstream}, - } - return c.controller.syncManifests(ctx, nil, updated, nil) + return c.controller.syncEndpoint(ctx, ep) } func (c *endpointsController) handleSyncErr(obj interface{}, err error) { diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go index ea5119c619..c8eaa059ee 100644 --- a/pkg/ingress/endpointslice.go +++ b/pkg/ingress/endpointslice.go @@ -13,3 +13,213 @@ // See the License for the specific language governing permissions and // limitations under the License. package ingress + +import ( + "context" + "time" + + "go.uber.org/zap" + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/types" +) + +const ( + _endpointSlicesManagedBy = "endpointslice-controller.k8s.io" +) + +type endpointSliceEvent struct { + Key string + ServiceName string +} + +type endpointSliceController struct { + controller *Controller + workqueue workqueue.RateLimitingInterface + workers int +} + +func (c *Controller) newEndpointSliceController() *endpointSliceController { + ctl := &endpointSliceController{ + controller: c, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Second, 60*time.Second, 5), "endpointSlice"), + workers: 1, + } + + ctl.controller.epInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: ctl.onAdd, + UpdateFunc: ctl.onUpdate, + DeleteFunc: ctl.onDelete, + }, + ) + + return ctl +} + +func (c *endpointSliceController) run(ctx context.Context) { + log.Info("endpointSlice controller started") + defer log.Info("endpointSlice controller exited") + defer c.workqueue.ShutDown() + + if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.epInformer.HasSynced); !ok { + log.Error("informers sync failed") + return + } + + handler := func() { + for { + obj, shutdown := c.workqueue.Get() + if shutdown { + return + } + + err := c.sync(ctx, obj.(*types.Event)) + c.workqueue.Done(obj) + c.handleSyncErr(obj, err) + } + } + + for i := 0; i < c.workers; i++ { + go handler() + } + + <-ctx.Done() +} + +func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) error { + epEvent := ev.Object.(endpointSliceEvent) + namespace, _, err := cache.SplitMetaNamespaceKey(epEvent.Key) + if err != nil { + log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", epEvent.Key) + return nil + } + ep, err := c.controller.epLister.GetEndpointSlices(namespace, epEvent.ServiceName) + if err != nil { + log.Errorf("failed to get all endpointSlices for service %s: %s", + epEvent.ServiceName, err) + return err + } + return c.controller.syncEndpoint(ctx, ep) +} + +func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) { + if err == nil { + c.workqueue.Forget(obj) + return + } + log.Warnw("sync endpointSlice failed, will retry", + zap.Any("object", obj), + ) + c.workqueue.AddRateLimited(obj) +} + +func (c *endpointSliceController) onAdd(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorf("found endpointSlice object with bad namespace") + } + if !c.controller.namespaceWatching(key) { + return + } + ep := obj.(*discoveryv1.EndpointSlice) + svcName := ep.Labels[discoveryv1.LabelServiceName] + if svcName == "" { + return + } + if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy { + // We only care about endpointSlice objects managed by the EndpointSlices + // controller. + return + } + + log.Debugw("endpointSlice add event arrived", + zap.String("object-key", key), + ) + + c.workqueue.AddRateLimited(&types.Event{ + Type: types.EventAdd, + Object: endpointSliceEvent{ + Key: key, + ServiceName: svcName, + }, + }) +} + +func (c *endpointSliceController) onUpdate(prev, curr interface{}) { + prevEp := prev.(*discoveryv1.EndpointSlice) + currEp := curr.(*discoveryv1.EndpointSlice) + + if prevEp.GetResourceVersion() == currEp.GetResourceVersion() { + return + } + key, err := cache.MetaNamespaceKeyFunc(currEp) + if err != nil { + log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", err) + return + } + if !c.controller.namespaceWatching(key) { + return + } + if currEp.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy { + // We only care about endpointSlice objects managed by the EndpointSlices + // controller. + return + } + svcName := currEp.Labels[discoveryv1.LabelServiceName] + if svcName == "" { + return + } + + log.Debugw("endpointSlice update event arrived", + zap.Any("new object", currEp), + zap.Any("old object", prevEp), + ) + c.workqueue.AddRateLimited(&types.Event{ + Type: types.EventUpdate, + // TODO pass key. + Object: endpointSliceEvent{ + Key: key, + ServiceName: svcName, + }, + }) +} + +func (c *endpointSliceController) onDelete(obj interface{}) { + ep, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + log.Errorf("found endpoints: %+v in bad tombstone state", obj) + return + } + ep = tombstone.Obj.(*discoveryv1.EndpointSlice) + } + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorf("found endpointSlice object with bad namespace/name: %s, ignore it", err) + return + } + if !c.controller.namespaceWatching(key) { + return + } + if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy { + // We only care about endpointSlice objects managed by the EndpointSlices + // controller. + return + } + svcName := ep.Labels[discoveryv1.LabelServiceName] + log.Debugw("endpoints delete event arrived", + zap.Any("object-key", key), + ) + c.workqueue.AddRateLimited(&types.Event{ + Type: types.EventDelete, + Object: endpointSliceEvent{ + Key: key, + ServiceName: svcName, + }, + }) +} diff --git a/pkg/kube/endpoint.go b/pkg/kube/endpoint.go index 27eba844ab..995a73c39c 100644 --- a/pkg/kube/endpoint.go +++ b/pkg/kube/endpoint.go @@ -59,7 +59,7 @@ func (lister *endpointLister) GetEndpoint(namespace, name string) (Endpoint, err } func (lister *endpointLister) GetEndpointSlices(namespace, svcName string) (Endpoint, error) { - if lister.epsLister != nil { + if lister.epsLister == nil { panic("not a endpointSlice lister") } selector := labels.SelectorFromSet(labels.Set{ @@ -174,3 +174,10 @@ func NewEndpoint(ep *corev1.Endpoints) Endpoint { endpoint: ep, } } + +// NewEndpointWithSlice creates an Endpoint which entity is Kubernetes EndpointSlices. +func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint { + return &endpoint{ + endpointSlices: []*discoveryv1.EndpointSlice{ep}, + } +} diff --git a/pkg/kube/translation/translator_test.go b/pkg/kube/translation/translator_test.go index aa9bd70300..f744403feb 100644 --- a/pkg/kube/translation/translator_test.go +++ b/pkg/kube/translation/translator_test.go @@ -16,9 +16,11 @@ package translation import ( "context" - "github.com/apache/apisix-ingress-controller/pkg/kube" "testing" + "github.com/apache/apisix-ingress-controller/pkg/kube" + discoveryv1 "k8s.io/api/discovery/v1" + apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -213,3 +215,132 @@ func TestTranslateUpstreamNodes(t *testing.T) { }, }) } + +func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) { + svc := &corev1.Service{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "test", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "port1", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 9080, + }, + }, + { + Name: "port2", + Port: 443, + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 9443, + }, + }, + }, + }, + } + isTrue := true + port1 := int32(9080) + port2 := int32(9443) + port1Name := "port1" + port2Name := "port2" + ep := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "test", + Labels: map[string]string{ + discoveryv1.LabelManagedBy: "endpointslice-controller.k8s.io", + discoveryv1.LabelServiceName: "svc", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{ + "192.168.1.1", + "192.168.1.2", + }, + Conditions: discoveryv1.EndpointConditions{ + Ready: &isTrue, + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: &port1Name, + Port: &port1, + }, + { + Name: &port2Name, + Port: &port2, + }, + }, + } + + client := fake.NewSimpleClientset() + informersFactory := informers.NewSharedInformerFactory(client, 0) + svcInformer := informersFactory.Core().V1().Services().Informer() + svcLister := informersFactory.Core().V1().Services().Lister() + + processCh := make(chan struct{}) + svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + processCh <- struct{}{} + }, + }) + + stopCh := make(chan struct{}) + defer close(stopCh) + go svcInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, svcInformer.HasSynced) + + _, err := client.CoreV1().Services("test").Create(context.Background(), svc, metav1.CreateOptions{}) + assert.Nil(t, err) + + tr := &translator{&TranslatorOptions{ + ServiceLister: svcLister, + }} + <-processCh + + nodes, err := tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 10080, nil) + assert.Nil(t, nodes) + assert.Equal(t, err, &translateError{ + field: "service.spec.ports", + reason: "port not defined", + }) + + nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 80, nil) + assert.Nil(t, err) + assert.Equal(t, nodes, apisixv1.UpstreamNodes{ + { + Host: "192.168.1.1", + Port: 9080, + Weight: 100, + }, + { + Host: "192.168.1.2", + Port: 9080, + Weight: 100, + }, + }) + + nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 443, nil) + assert.Nil(t, err) + assert.Equal(t, nodes, apisixv1.UpstreamNodes{ + { + Host: "192.168.1.1", + Port: 9443, + Weight: 100, + }, + { + Host: "192.168.1.2", + Port: 9443, + Weight: 100, + }, + }) +} diff --git a/samples/deploy/rbac/apisix_view_clusterrole.yaml b/samples/deploy/rbac/apisix_view_clusterrole.yaml index ef9d3427aa..7a9ff16232 100644 --- a/samples/deploy/rbac/apisix_view_clusterrole.yaml +++ b/samples/deploy/rbac/apisix_view_clusterrole.yaml @@ -157,3 +157,11 @@ rules: - leases verbs: - '*' +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go index 578907a860..2b97082550 100644 --- a/test/e2e/scaffold/ingress.go +++ b/test/e2e/scaffold/ingress.go @@ -168,6 +168,14 @@ rules: - leases verbs: - '*' + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch ` _clusterRoleBinding = ` apiVersion: rbac.authorization.k8s.io/v1 @@ -256,6 +264,7 @@ spec: - %s,kube-system - --apisix-route-version - %s + - --watch-endpointslices serviceAccount: ingress-apisix-e2e-test-service-account ` )