Skip to content

Commit

Permalink
chore: endpointslice controller (#574)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers committed Jul 10, 2021
1 parent 1c17b41 commit 67f3fd9
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 92 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ This project is currently general availability.

## Prerequisites

Apisix ingress controller requires Kubernetes version 1.14+.
Apisix ingress controller requires Kubernetes version 1.15+.

## Apache APISIX Ingress vs. Kubernetes Ingress Nginx

Expand Down
1 change: 1 addition & 0 deletions cmd/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute api group version, can be \"apisix.apache.org/v1\" or \"apisix.apache.org/v2alpha1\"")
cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, "watch-endpointslices", false, "whether to watch endpointslices rather than endpoints")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api (deprecated, using --default-apisix-cluster-base-url instead)")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api (deprecated, using --default-apisix-cluster-admin-key instead)")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
Expand Down
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ kubernetes:
ingress_version: "networking/v1" # the supported ingress api group version, can be "networking/v1beta1"
# , "networking/v1" (for Kubernetes version v1.19.0 or higher), and
# "extensions/v1beta1", default is "networking/v1".
watch_endpointslices: false # whether to watch EndpointSlices rather than Endpoints.

apisix_route_version: "apisix.apache.org/v2alpha1" # the supported apisixroute api group version, can be
# "apisix.apache.org/v1" or "apisix.apache.org/v2alpha1",
Expand Down
2 changes: 1 addition & 1 deletion docs/en/latest/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Tips: The failure caused by empty upstream nodes is a limitation of Apache APISI

6. What is the retry rule of `apisix-ingress-controller`?

If an error occurs during the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.
If an error occurs duriREADME.mdng the process of `apisix-ingress-controller` parsing CRD and distributing the configuration to APISIX, a retry will be triggered.

The delayed retry method is adopted. After the first failure, it is retried once per second. After 5 retries are triggered, the slow retry strategy will be enabled, and the retry will be performed every 1 minute until it succeeds.

Expand Down
103 changes: 97 additions & 6 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"sync"
"time"

apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -107,10 +110,11 @@ type Controller struct {
apisixConsumerLister listersv2alpha1.ApisixConsumerLister

// resource controllers
podController *podController
endpointsController *endpointsController
ingressController *ingressController
secretController *secretController
podController *podController
endpointsController *endpointsController
endpointSliceController *endpointSliceController
ingressController *ingressController
secretController *secretController

apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
Expand Down Expand Up @@ -237,8 +241,12 @@ func (c *Controller) initWhenStartLeading() {
c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
c.apisixConsumerInformer = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer()

if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController = c.newEndpointSliceController()
} else {
c.endpointsController = c.newEndpointsController()
}
c.podController = c.newPodController()
c.endpointsController = c.newEndpointsController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
c.apisixRouteController = c.newApisixRouteController()
Expand Down Expand Up @@ -429,7 +437,11 @@ func (c *Controller) run(ctx context.Context) {
c.podController.run(ctx)
})
c.goAttach(func() {
c.endpointsController.run(ctx)
if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController.run(ctx)
} else {
c.endpointsController.run(ctx)
}
})
c.goAttach(func() {
c.apisixUpstreamController.run(ctx)
Expand Down Expand Up @@ -508,6 +520,85 @@ func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consum
}
return
}

func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
namespace := ep.Namespace()
svcName := ep.ServiceName()
svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
log.Infof("service %s/%s not found", ep.Namespace(), svcName)
return nil
}
log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
} else if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}

clusters := c.apisix.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
for _, cluster := range clusters {
if err := c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}
return nil
}

func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
log.Warnw("upstream is not referenced",
zap.String("cluster", cluster.String()),
zap.String("upstream", upsName),
)
return nil
} else {
log.Errorw("failed to get upstream",
zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
return err
}
}

upstream.Nodes = nodes

log.Debugw("upstream binds new nodes",
zap.Any("upstream", upstream),
zap.String("cluster", cluster.String()),
)

updated := &manifest{
upstreams: []*apisixv1.Upstream{upstream},
}
return c.syncManifests(ctx, nil, updated, nil)
}

func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
defer cancelFunc()
for {
Expand Down
84 changes: 2 additions & 82 deletions pkg/ingress/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ package ingress

import (
"context"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/apache/apisix-ingress-controller/pkg/apisix"
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

type endpointsController struct {
Expand Down Expand Up @@ -89,82 +84,7 @@ func (c *endpointsController) run(ctx context.Context) {

func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
ep := ev.Object.(kube.Endpoint)
namespace := ep.Namespace()
svcName := ep.ServiceName()
svc, err := c.controller.svcLister.Services(ep.Namespace()).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
log.Infof("service %s/%s not found", ep.Namespace(), svcName)
return nil
}
log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
} else if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}

clusters := c.controller.apisix.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
for _, cluster := range clusters {
if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}

return nil
}

func (c *endpointsController) syncToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
log.Warnw("upstream is not referenced",
zap.String("cluster", cluster.String()),
zap.String("upstream", upsName),
)
return nil
} else {
log.Errorw("failed to get upstream",
zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
return err
}
}

upstream.Nodes = nodes

log.Debugw("upstream binds new nodes",
zap.Any("upstream", upstream),
zap.String("cluster", cluster.String()),
)

updated := &manifest{
upstreams: []*apisixv1.Upstream{upstream},
}
return c.controller.syncManifests(ctx, nil, updated, nil)
return c.controller.syncEndpoint(ctx, ep)
}

func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
Expand Down

0 comments on commit 67f3fd9

Please sign in to comment.