Skip to content

Commit

Permalink
feat: subset translation (#497)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers committed May 31, 2021
1 parent 87b7229 commit a89be23
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 14 deletions.
6 changes: 6 additions & 0 deletions pkg/kube/apisix/apis/config/v2alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type ApisixRouteHTTPBackend struct {
ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
// Weight of this backend.
Weight *int `json:"weight" yaml:"weight"`
// Subset specifies a subset for the target Service. The subset should be pre-defined
// in ApisixUpstream about this service.
Subset string `json:"subset" yaml:"subset"`
}

// ApisixRouteHTTPPlugin represents an APISIX plugin.
Expand Down Expand Up @@ -232,6 +235,9 @@ type ApisixRouteTCPBackend struct {
// wise, the service ClusterIP or ExternalIP will be used,
// default is endpoints.
ResolveGranularity string `json:"resolveGranularity" yaml:"resolveGranularity"`
// Subset specifies a subset for the target Service. The subset should be pre-defined
// in ApisixUpstream about this service.
Subset string `json:"subset" yaml:"subset"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
9 changes: 3 additions & 6 deletions pkg/kube/translation/apisix_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (t *translator) TranslateRouteV1(ar *configv1.ApisixRoute) (*TranslateConte
route.UpstreamId = id.GenID(upstreamName)

if !ctx.checkUpstreamExist(upstreamName) {
ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, int32(p.Backend.ServicePort))
ups, err := t.TranslateUpstream(ar.Namespace, p.Backend.ServiceName, "", int32(p.Backend.ServicePort))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (t *translator) translateHTTPRoute(ctx *TranslateContext, ar *configv2alpha
}
ctx.addRoute(route)
if !ctx.checkUpstreamExist(upstreamName) {
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
Expand Down Expand Up @@ -312,10 +312,7 @@ func (t *translator) translateTCPRoute(ctx *TranslateContext, ar *configv2alpha1
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, part.Name)
sr.ID = id.GenID(name)
sr.ServerPort = part.Match.IngressPort
// TODO use upstream id to refer the upstream object.
// Currently, APISIX doesn't use upstream_id field in
// APISIX, so we have to embed the entire upstream.
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kube/translation/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *n
} else {
svcPort = backend.Port.Number
}
ups, err := t.TranslateUpstream(namespace, backend.Name, svcPort)
ups, err := t.TranslateUpstream(namespace, backend.Name, "", svcPort)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcNa
} else {
portNumber = svcPort.IntVal
}
ups, err := t.TranslateUpstream(namespace, svcName, portNumber)
ups, err := t.TranslateUpstream(namespace, svcName, "", portNumber)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kube/translation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx *TranslateContext, ar *conf
if err != nil {
return nil, err
}
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
ups, err := t.translateUpstream(ar.Namespace, backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, svcPort)
if err != nil {
return nil, err
}
Expand Down
31 changes: 28 additions & 3 deletions pkg/kube/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand Down Expand Up @@ -54,7 +55,11 @@ type Translator interface {
// The returned Upstream doesn't have metadata info.
// It doesn't assign any metadata fields, so it's caller's responsibility to decide
// the metadata.
TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
// Note the subset is used to filter the ultimate node list, only pods whose labels
// matching the subset labels (defined in ApisixUpstream) will be selected.
// When the subset is not found, the node list will be empty. When the subset is empty,
// all pods IP will be filled.
TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error)
// TranslateIngress composes a couple of APISIX Routes and upstreams according
// to the given Ingress resource.
TranslateIngress(kube.Ingress) (*TranslateContext, error)
Expand All @@ -77,6 +82,7 @@ type Translator interface {
// TranslatorOptions contains options to help Translator
// work well.
type TranslatorOptions struct {
PodCache types.PodCache
PodLister listerscorev1.PodLister
EndpointsLister listerscorev1.EndpointsLister
ServiceLister listerscorev1.ServiceLister
Expand Down Expand Up @@ -112,7 +118,7 @@ func (t *translator) TranslateUpstreamConfig(au *configv1.ApisixUpstreamConfig)
return ups, nil
}

func (t *translator) TranslateUpstream(namespace, name string, port int32) (*apisixv1.Upstream, error) {
func (t *translator) TranslateUpstream(namespace, name, subset string, port int32) (*apisixv1.Upstream, error) {
endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
if err != nil {
return nil, &translateError{
Expand All @@ -128,14 +134,33 @@ func (t *translator) TranslateUpstream(namespace, name string, port int32) (*api
au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
ups.Nodes = nodes
// If subset in ApisixRoute is not empty but the ApisixUpstream resouce not found,
// just set an empty node list.
if subset != "" {
ups.Nodes = apisixv1.UpstreamNodes{}
} else {
ups.Nodes = nodes
}
return ups, nil
}
return nil, &translateError{
field: "ApisixUpstream",
reason: err.Error(),
}
}

// Filter nodes by subset.
if subset != "" {
var labels types.Labels
for _, ss := range au.Spec.Subsets {
if ss.Name == subset {
labels = ss.Labels
break
}
}
nodes = t.filterNodesByLabels(nodes, labels, au.Namespace)
}

upsCfg := &au.Spec.ApisixUpstreamConfig
for _, pls := range au.Spec.PortLevelSettings {
if pls.Port == port {
Expand Down
35 changes: 33 additions & 2 deletions pkg/kube/translation/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/id"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

Expand Down Expand Up @@ -109,8 +110,8 @@ loop:
return svc.Spec.ClusterIP, svcPort, nil
}

func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
ups, err := t.TranslateUpstream(namespace, svcName, svcPort)
func (t *translator) translateUpstream(namespace, svcName, subset, svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, error) {
ups, err := t.TranslateUpstream(namespace, svcName, subset, svcPort)
if err != nil {
return nil, err
}
Expand All @@ -128,6 +129,36 @@ func (t *translator) translateUpstream(namespace, svcName, svcResolveGranularity
return ups, nil
}

func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels types.Labels, namespace string) apisixv1.UpstreamNodes {
if labels == nil {
return nodes
}

var filteredNodes apisixv1.UpstreamNodes
for _, node := range nodes {
podName, err := t.PodCache.GetNameByIP(node.Host)
if err != nil {
log.Errorw("failed to find pod name by ip, ignore it",
zap.Error(err),
zap.String("pod_ip", node.Host),
)
continue
}
pod, err := t.PodLister.Pods(namespace).Get(podName)
if err != nil {
log.Errorw("failed to find pod, ignore it",
zap.Error(err),
zap.String("pod_name", podName),
)
continue
}
if labels.IsSubsetOf(pod.Labels) {
filteredNodes = append(filteredNodes, node)
}
}
return filteredNodes
}

func validateRemoteAddrs(remoteAddrs []string) error {
for _, addr := range remoteAddrs {
if ip := net.ParseIP(addr); ip == nil {
Expand Down
6 changes: 6 additions & 0 deletions samples/deploy/crd/v1beta1/ApisixRoute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ spec:
weight:
type: integer
minimum: 0
subset:
type: string
required:
- serviceName
- servicePort
Expand All @@ -188,6 +190,8 @@ spec:
weight:
type: integer
minimum: 0
subset:
type: string
required:
- serviceName
- servicePort
Expand Down Expand Up @@ -238,6 +242,8 @@ spec:
resolveGranualrity:
type: string
enum: ["endpoint", "service"]
subset:
type: string
required:
- serviceName
- servicePort

0 comments on commit a89be23

Please sign in to comment.