diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go index 1fdf4827b9..c978e924b9 100644 --- a/pkg/kube/apisix/apis/config/v2alpha1/types.go +++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go @@ -174,6 +174,9 @@ type ApisixRouteHTTPBackend struct { ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"` // Weight of this backend. Weight *int `json:"weight" yaml:"weight"` + // Subset specifies a subset for the target Service. The subset should be pre-defined + // in ApisixUpstream about this service. + Subset string `json:"subset" yaml:"subset"` } // ApisixRouteHTTPPlugin represents an APISIX plugin. @@ -232,6 +235,9 @@ type ApisixRouteTCPBackend struct { // wise, the service ClusterIP or ExternalIP will be used, // default is endpoints. ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"` + // Subset specifies a subset for the target Service. The subset should be pre-defined + // in ApisixUpstream about this service. + Subset string `json:"subset" yaml:"subset"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/kube/translation/apisix_route.go b/pkg/kube/translation/apisix_route.go index dceda4f9fb..2360d72eef 100644 --- a/pkg/kube/translation/apisix_route.go +++ b/pkg/kube/translation/apisix_route.go @@ -64,7 +64,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) (*TranslateConte route.UpstreamId = id.GenID(upstreamName) if !ctx.checkUpstreamExist(upstreamName) { - ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort)) + ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort)) if err != nil { return nil, err } @@ -181,7 +181,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha } ctx.addRoute(route) if !ctx.checkUpstreamExist(upstreamName) { - ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return err } @@ -312,10 +312,7 @@ func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1 name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name) sr.ID = id.GenID(name) sr.ServerPort = part.Match.IngressPort - // TODO use upstream id to refer the upstream object. - // Currently, APISIX doesn't use upstream_id field in - // APISIX, so we have to embed the entire upstream. - ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return err } diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go index ab0ea6f3e3..5169b68cb2 100644 --- a/pkg/kube/translation/ingress.go +++ b/pkg/kube/translation/ingress.go @@ -169,7 +169,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n } else { svcPort = backend.Port.Number } - ups, err := t.TranslateUpstream(namespace, backend.Name, svcPort) + ups, err := t.TranslateUpstream(namespace, backend.Name, "", svcPort) if err != nil { return nil, err } @@ -260,7 +260,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa } else { portNumber = svcPort.IntVal } - ups, err := t.TranslateUpstream(namespace, svcName, portNumber) + ups, err := t.TranslateUpstream(namespace, svcName, "", portNumber) if err != nil { return nil, err } diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go index d58379d72f..3cc649030f 100644 --- a/pkg/kube/translation/plugin.go +++ b/pkg/kube/translation/plugin.go @@ -38,7 +38,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ar *conf if err != nil { return nil, err } - ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort) + ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort) if err != nil { return nil, err } diff --git a/pkg/kube/translation/translator.go b/pkg/kube/translation/translator.go index e15a6f7200..aed32515af 100644 --- a/pkg/kube/translation/translator.go +++ b/pkg/kube/translation/translator.go @@ -25,6 +25,7 @@ import ( configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -54,7 +55,11 @@ type Translator interface { // The returned Upstream doesn't have metadata info. // It doesn't assign any metadata fields, so it's caller's responsibility to decide // the metadata. - TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error) + // Note the subset is used to filter the ultimate node list, only pods whose labels + // matching the subset labels (defined in ApisixUpstream) will be selected. + // When the subset is not found, the node list will be empty. When the subset is empty, + // all pods IP will be filled. + TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error) // TranslateIngress composes a couple of APISIX Routes and upstreams according // to the given Ingress resource. TranslateIngress(kube.Ingress) (*TranslateContext, error) @@ -77,6 +82,7 @@ type Translator interface { // TranslatorOptions contains options to help Translator // work well. type TranslatorOptions struct { + PodCache types.PodCache PodLister listerscorev1.PodLister EndpointsLister listerscorev1.EndpointsLister ServiceLister listerscorev1.ServiceLister @@ -112,7 +118,7 @@ func (t *translator) TranslateUpstreamConfig(au *configv1.ApisixUpstreamConfig) return ups, nil } -func (t *translator) TranslateUpstream(namespace, name string, port int32) (*apisixv1.Upstream, error) { +func (t *translator) TranslateUpstream(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) { endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name) if err != nil { return nil, &translateError{ @@ -128,7 +134,13 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - ups.Nodes = nodes + // If subset in ApisixRoute is not empty but the ApisixUpstream resouce not found, + // just set an empty node list. + if subset != "" { + ups.Nodes = apisixv1.UpstreamNodes{} + } else { + ups.Nodes = nodes + } return ups, nil } return nil, &translateError{ @@ -136,6 +148,19 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api reason: err.Error(), } } + + // Filter nodes by subset. + if subset != "" { + var labels types.Labels + for _, ss := range au.Spec.Subsets { + if ss.Name == subset { + labels = ss.Labels + break + } + } + nodes = t.filterNodesByLabels(nodes, labels, au.Namespace) + } + upsCfg := &au.Spec.ApisixUpstreamConfig for _, pls := range au.Spec.PortLevelSettings { if pls.Port == port { diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go index 16ced5aaf9..3584205d04 100644 --- a/pkg/kube/translation/util.go +++ b/pkg/kube/translation/util.go @@ -24,6 +24,7 @@ import ( "github.com/apache/apisix-ingress-controller/pkg/id" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" "github.com/apache/apisix-ingress-controller/pkg/log" + "github.com/apache/apisix-ingress-controller/pkg/types" apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" ) @@ -109,8 +110,8 @@ loop: return svc.Spec.ClusterIP, svcPort, nil } -func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) { - ups, err := t.TranslateUpstream(namespace, svcName, svcPort) +func (t *translator) translateUpstream(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) { + ups, err := t.TranslateUpstream(namespace, svcName, subset, svcPort) if err != nil { return nil, err } @@ -128,6 +129,36 @@ func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity return ups, nil } +func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes { + if labels == nil { + return nodes + } + + var filteredNodes apisixv1.UpstreamNodes + for _, node := range nodes { + podName, err := t.PodCache.GetNameByIP(node.Host) + if err != nil { + log.Errorw("failed to find pod name by ip, ignore it", + zap.Error(err), + zap.String("pod_ip", node.Host), + ) + continue + } + pod, err := t.PodLister.Pods(namespace).Get(podName) + if err != nil { + log.Errorw("failed to find pod, ignore it", + zap.Error(err), + zap.String("pod_name", podName), + ) + continue + } + if labels.IsSubsetOf(pod.Labels) { + filteredNodes = append(filteredNodes, node) + } + } + return filteredNodes +} + func validateRemoteAddrs(remoteAddrs []string) error { for _, addr := range remoteAddrs { if ip := net.ParseIP(addr); ip == nil { diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml b/samples/deploy/crd/v1beta1/ApisixRoute.yaml index 89f4e1af2b..974795e130 100644 --- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml +++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml @@ -166,6 +166,8 @@ spec: weight: type: integer minimum: 0 + subset: + type: string required: - serviceName - servicePort @@ -188,6 +190,8 @@ spec: weight: type: integer minimum: 0 + subset: + type: string required: - serviceName - servicePort @@ -238,6 +242,8 @@ spec: resolveGranualrity: type: string enum: ["endpoint", "service"] + subset: + type: string required: - serviceName - servicePort