Skip to content

Commit

Permalink
feat: subset changes in controllers (#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers committed Jun 2, 2021
1 parent c6ac8a4 commit 3337be7
Show file tree
Hide file tree
Showing 19 changed files with 412 additions and 107 deletions.
3 changes: 3 additions & 0 deletions docs/en/latest/references/apisix_upstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ title: ApisixUpstream Reference
| portLevelSettings.scheme | string | same as `scheme` but takes higher precedence. |
| portLevelSettings.loadbalancer | object | same as `loadbalancer` but takes higher precedence. |
| portLevelSettings.healthCheck | object | same as `healthCheck` but takes higher precedence. |
| subsets | array | service subset list, use pod labels to organize service endpoints to different groups. |
| subsets[].name | string | the subset name. |
| subsets[].labels | object | the subset label map. |
89 changes: 48 additions & 41 deletions pkg/ingress/apisix_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,58 +129,65 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
return err
}

var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}
clusterName := c.controller.cfg.APISIX.DefaultClusterName
for _, port := range svc.Spec.Ports {
upsName := apisixv1.ComposeUpstreamName(namespace, name, port.Port)
// TODO: multiple cluster
ups, err := c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
continue
}
log.Errorf("failed to get upstream %s: %s", upsName, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
var newUps *apisixv1.Upstream
if ev.Type != types.EventDelete {
cfg, ok := portLevelSettings[port.Port]
if !ok {
cfg = &au.Spec.ApisixUpstreamConfig
}
// FIXME Same ApisixUpstreamConfig might be translated multiple times.
newUps, err = c.controller.translator.TranslateUpstreamConfig(cfg)
for _, subset := range subsets {
upsName := apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port)
// TODO: multiple cluster
ups, err := c.controller.apisix.Cluster(clusterName).Upstream().Get(ctx, upsName)
if err != nil {
log.Errorw("found malformed ApisixUpstream",
zap.Any("object", au),
zap.Error(err),
)
if err == apisixcache.ErrNotFound {
continue
}
log.Errorf("failed to get upstream %s: %s", upsName, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
} else {
newUps = apisixv1.NewDefaultUpstream()
}
var newUps *apisixv1.Upstream
if ev.Type != types.EventDelete {
cfg, ok := portLevelSettings[port.Port]
if !ok {
cfg = &au.Spec.ApisixUpstreamConfig
}
// FIXME Same ApisixUpstreamConfig might be translated multiple times.
newUps, err = c.controller.translator.TranslateUpstreamConfig(cfg)
if err != nil {
log.Errorw("found malformed ApisixUpstream",
zap.Any("object", au),
zap.Error(err),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
} else {
newUps = apisixv1.NewDefaultUpstream()
}

newUps.Metadata = ups.Metadata
newUps.Nodes = ups.Nodes
log.Debugw("updating upstream since ApisixUpstream changed",
zap.String("event", ev.Type.String()),
zap.Any("upstream", newUps),
zap.Any("ApisixUpstream", au),
)
if _, err := c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil {
log.Errorw("failed to update upstream",
zap.Error(err),
newUps.Metadata = ups.Metadata
newUps.Nodes = ups.Nodes
log.Debugw("updating upstream since ApisixUpstream changed",
zap.String("event", ev.Type.String()),
zap.Any("upstream", newUps),
zap.Any("ApisixUpstream", au),
zap.String("cluster", clusterName),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
if _, err := c.controller.apisix.Cluster(clusterName).Upstream().Update(ctx, newUps); err != nil {
log.Errorw("failed to update upstream",
zap.Error(err),
zap.Any("upstream", newUps),
zap.Any("ApisixUpstream", au),
zap.String("cluster", clusterName),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
}
}
c.controller.recorderEvent(au, corev1.EventTypeNormal, _resourceSynced, nil)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (c *Controller) initWhenStartLeading() {
c.apisixClusterConfigLister = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister()

c.translator = translation.NewTranslator(&translation.TranslatorOptions{
PodCache: c.podCache,
PodLister: c.podLister,
EndpointsLister: c.epLister,
ServiceLister: c.svcLister,
ApisixUpstreamLister: c.apisixUpstreamLister,
Expand Down
39 changes: 27 additions & 12 deletions pkg/ingress/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/apache/apisix-ingress-controller/pkg/apisix"
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
Expand Down Expand Up @@ -96,6 +97,18 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
log.Errorf("failed to get service %s/%s: %s", ep.Namespace, ep.Name, err)
return err
}
var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
au, err := c.controller.apisixUpstreamLister.ApisixUpstreams(ep.Namespace).Get(ep.Name)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace, ep.Name, err)
return err
}
} else if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}

portMap := make(map[string]int32)
for _, port := range svc.Spec.Ports {
portMap[port.Name] = port.Port
Expand All @@ -109,18 +122,20 @@ func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
log.Errorf("port %s in endpoints %s/%s but not in service", port.Name, ep.Namespace, ep.Name)
continue
}
nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, svcPort)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", svcPort),
)
}
name := apisixv1.ComposeUpstreamName(ep.Namespace, ep.Name, svcPort)
for _, cluster := range clusters {
if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil {
return err
for _, subset := range subsets {
nodes, err := c.controller.translator.TranslateUpstreamNodes(ep, svcPort, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", svcPort),
)
}
name := apisixv1.ComposeUpstreamName(ep.Namespace, ep.Name, subset.Name, svcPort)
for _, cluster := range clusters {
if err := c.syncToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}
Expand Down
43 changes: 39 additions & 4 deletions pkg/ingress/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/client-go/tools/cache"

"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
)

type podController struct {
Expand All @@ -35,6 +36,7 @@ func (c *Controller) newPodController() *podController {
ctl.controller.podInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
DeleteFunc: ctl.onDelete,
},
)
Expand Down Expand Up @@ -67,10 +69,43 @@ func (c *podController) onAdd(obj interface{}) {
)
pod := obj.(*corev1.Pod)
if err := c.controller.podCache.Add(pod); err != nil {
log.Errorw("failed to add pod to cache",
zap.Error(err),
zap.Any("pod", pod),
)
if err == types.ErrPodNoAssignedIP {
log.Debugw("pod no assigned ip, postpone the adding in subsequent update event",
zap.Any("pod", pod),
)
} else {
log.Errorw("failed to add pod to cache",
zap.Error(err),
zap.Any("pod", pod),
)
}
}
}

func (c *podController) onUpdate(_, cur interface{}) {
pod := cur.(*corev1.Pod)

if !c.controller.namespaceWatching(pod.Namespace + "/" + pod.Name) {
return
}
log.Debugw("pod update event arrived",
zap.Any("final state", pod),
)
if pod.DeletionTimestamp != nil {
if err := c.controller.podCache.Delete(pod); err != nil {
log.Errorw("failed to delete pod from cache",
zap.Error(err),
zap.Any("pod", pod),
)
}
}
if pod.Status.PodIP != "" {
if err := c.controller.podCache.Add(pod); err != nil {
log.Errorw("failed to add pod to cache",
zap.Error(err),
zap.Any("pod", pod),
)
}
}
}

Expand Down
53 changes: 50 additions & 3 deletions pkg/ingress/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package ingress

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/pkg/types"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/apisix-ingress-controller/pkg/types"
)

func TestPodOnAdd(t *testing.T) {
Expand Down Expand Up @@ -109,3 +109,50 @@ func TestPodOnDelete(t *testing.T) {
assert.Equal(t, name, "abc")
assert.Nil(t, err)
}

func TestPodOnUpdate(t *testing.T) {
ctl := &podController{
controller: &Controller{
watchingNamespace: map[string]struct{}{
"default": {},
},
podCache: types.NewPodCache(),
},
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "nginx",
DeletionTimestamp: &metav1.Time{
Time: time.Now(),
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "10.0.5.12",
},
}
assert.Nil(t, ctl.controller.podCache.Add(pod), "adding pod")

ctl.onUpdate(nil, pod)
name, err := ctl.controller.podCache.GetNameByIP("10.0.5.12")
assert.Equal(t, name, "nginx")
assert.Equal(t, err, nil)

pod2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "public",
Name: "abc",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "10.0.5.13",
},
}
assert.Nil(t, ctl.controller.podCache.Add(pod2), "adding pod")
ctl.onUpdate(nil, pod2)
name, err = ctl.controller.podCache.GetNameByIP("10.0.5.13")
assert.Equal(t, name, "abc")
assert.Nil(t, err)
}
4 changes: 2 additions & 2 deletions pkg/kube/translation/apisix_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) (*TranslateConte
}
}

upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort))
upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort))
route := apisixv1.NewDefaultRoute()
route.Name = r.Host + p.Path
route.ID = id.GenID(route.Name)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha
return err
}

upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, svcPort)
upstreamName := apisixv1.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, svcPort)
route := apisixv1.NewDefaultRoute()
route.Name = apisixv1.ComposeRouteName(ar.Namespace, ar.Name, part.Name)
route.ID = id.GenID(route.Name)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kube/translation/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n
if err != nil {
return nil, err
}
ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, svcPort)
ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort)
ups.ID = id.GenID(ups.Name)
return ups, nil
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa
if err != nil {
return nil, err
}
ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, portNumber)
ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber)
ups.ID = id.GenID(ups.Name)
return ups, nil
}
Expand Down
Loading

0 comments on commit 3337be7

Please sign in to comment.