Skip to content

Commit

Permalink
feat: support KIngress SSL, ingress.class, visibility & traffic-split
Browse files Browse the repository at this point in the history
Add support for KIngress SSL, ingress.class, visibility & traffic-split, but all these functionalities are not ready now. KIngress' SSL feature waits for APISIX Route to support SSL. The same for ingress.class and visibility. APISIX traffic-split plugin can only work at route level, not per upstream level in each route. So the plugin cannot fulfill KIngress's requirement at the moment.

Signed-off-by: Fang <37891485+fhuzero@users.noreply.github.com>
  • Loading branch information
fhuzero committed Jul 11, 2021
1 parent 86792ec commit df3b1a2
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 38 deletions.
65 changes: 62 additions & 3 deletions pkg/ingress/manifest.go
Expand Up @@ -108,35 +108,72 @@ func diffStreamRoutes(olds, news []*apisixv1.StreamRoute) (added, updated, delet
return
}

func diffSsls(olds, news []*apisixv1.Ssl) (added, updated, deleted []*apisixv1.Ssl) {
if olds == nil {
return news, nil, nil
}
if news == nil {
return nil, nil, olds
}

oldMap := make(map[string]*apisixv1.Ssl, len(olds))
newMap := make(map[string]*apisixv1.Ssl, len(news))
for _, ssl := range olds {
oldMap[ssl.ID] = ssl
}
for _, ssl := range news {
newMap[ssl.ID] = ssl
}

for _, ssl := range news {
if or, ok := oldMap[ssl.ID]; !ok {
added = append(added, ssl)
} else if !reflect.DeepEqual(or, ssl) {
updated = append(updated, ssl)
}
}
for _, ssl := range olds {
if _, ok := newMap[ssl.ID]; !ok {
deleted = append(deleted, ssl)
}
}
return
}

type manifest struct {
routes []*apisixv1.Route
upstreams []*apisixv1.Upstream
streamRoutes []*apisixv1.StreamRoute
ssls []*apisixv1.Ssl
}

func (m *manifest) diff(om *manifest) (added, updated, deleted *manifest) {
ar, ur, dr := diffRoutes(om.routes, m.routes)
au, uu, du := diffUpstreams(om.upstreams, m.upstreams)
asr, usr, dsr := diffStreamRoutes(om.streamRoutes, m.streamRoutes)
if ar != nil || au != nil || asr != nil {
assl, ussl, dssl := diffSsls(om.ssls, m.ssls)
if ar != nil || au != nil || asr != nil || assl != nil {
added = &manifest{
routes: ar,
upstreams: au,
streamRoutes: asr,
ssls: assl,
}
}
if ur != nil || uu != nil || usr != nil {
if ur != nil || uu != nil || usr != nil || ussl != nil {
updated = &manifest{
routes: ur,
upstreams: uu,
streamRoutes: usr,
ssls: ussl,
}
}
if dr != nil || du != nil || dsr != nil {
if dr != nil || du != nil || dsr != nil || dssl != nil {
deleted = &manifest{
routes: dr,
upstreams: du,
streamRoutes: dsr,
ssls: dssl,
}
}
return
Expand Down Expand Up @@ -170,6 +207,18 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
}
}
}
for _, ssl := range deleted.ssls {
if err := c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil {
// Upstream might be referenced by other routes.
if err != cache.ErrStillInUse {
merr = multierror.Append(merr, err)
} else {
log.Infow("ssl was referenced by other routes",
zap.String("ssl_id", ssl.ID),
)
}
}
}
}
if added != nil {
// Should create upstreams firstly due to the dependencies.
Expand All @@ -188,6 +237,11 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
merr = multierror.Append(merr, err)
}
}
for _, ssl := range added.ssls {
if _, err := c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if updated != nil {
for _, r := range updated.upstreams {
Expand All @@ -205,6 +259,11 @@ func (c *Controller) syncManifests(ctx context.Context, added, updated, deleted
merr = multierror.Append(merr, err)
}
}
for _, ssl := range updated.ssls {
if _, err := c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if merr != nil {
return merr
Expand Down
5 changes: 5 additions & 0 deletions pkg/kube/translation/context.go
Expand Up @@ -21,6 +21,7 @@ type TranslateContext struct {
Routes []*apisix.Route
StreamRoutes []*apisix.StreamRoute
Upstreams []*apisix.Upstream
Ssls []*apisix.Ssl

upstreamMap map[string]struct{}
}
Expand All @@ -41,6 +42,10 @@ func (tc *TranslateContext) addUpstream(u *apisix.Upstream) {
tc.Upstreams = append(tc.Upstreams, u)
}

func (tc *TranslateContext) addSsl(ssl *apisix.Ssl) {
tc.Ssls = append(tc.Ssls, ssl)
}

func (tc *TranslateContext) checkUpstreamExist(name string) (ok bool) {
_, ok = tc.upstreamMap[name]
return
Expand Down
178 changes: 143 additions & 35 deletions pkg/kube/translation/knative_ingress.go
Expand Up @@ -16,55 +16,120 @@ package translation

import (
"fmt"
"github.com/apache/apisix-ingress-controller/pkg/id"
"github.com/apache/apisix-ingress-controller/pkg/log"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"knative.dev/networking/pkg/apis/networking"
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"
knativev1alpha1 "knative.dev/networking/pkg/apis/networking/v1alpha1"
"strings"

"github.com/apache/apisix-ingress-controller/pkg/id"
"github.com/apache/apisix-ingress-controller/pkg/log"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

func (t *translator) translateKnativeIngressV1alpha1(ing *knativev1alpha1.Ingress) (*TranslateContext, error) {
ctx := &TranslateContext{
upstreamMap: make(map[string]struct{}),
}
// TODO: Ensures that apisix-ingress-controller does not pick knative ingress up when ingress.class annotation is incorrect.
// waiting for APISIX Route to support ingress.class
// see https://github.com/apache/apisix-ingress-controller/issues/451
if ingClass, ok := ing.Annotations[networking.IngressClassAnnotationKey]; !ok || !strings.Contains(ingClass, "apisix") {
log.Infow("ingress.class not configured to apisix, translation aborted",
zap.Any("knative ingress", ing))
//return nil, fmt.Errorf("ingress.class is configured to %s, not apisix, translation aborted", ingClass)
}

plugins := t.translateAnnotations(ing.Annotations)

// TODO: TestIngressConformance/tls cannot PASS, waiting for APISIX Route to support SNI based TLS
// see https://github.com/apache/apisix-ingress-controller/issues/547
// ApisixTls Reference: http://apisix.apache.org/docs/ingress-controller/references/apisix_tls/
/*
var ssls []*apisixv1.Ssl
for _, ingressTLS := range ing.Spec.TLS {
ssl, err := t.translateTLSFromKnativeIngressV1alpha1(ingressTLS)
if err != nil {
log.Errorw("failed to translate ingressTLS to apisixSsl",
zap.Error(err),
zap.Any("apisixSsl", ssl),
)
return nil, err
}
ssls = append(ssls, ssl)
log.Debugw("got SSL object from ApisixTls",
zap.Any("ssl", ssl),
)
}
ctx.Ssls = ssls
*/

for i, rule := range ing.Spec.Rules {
hosts := rule.Hosts
if rule.HTTP == nil {
continue
}
var ruleExtVisibility bool
if rule.Visibility == knativev1alpha1.IngressVisibilityExternalIP {
ruleExtVisibility = true
}
// from https://github.com/knative-sandbox/net-kourier/blob/dd1b827bb5b21c874222c18fc7fc1f3c54e40ee9/pkg/generator/ingress_translator.go#L95
//ruleName := fmt.Sprintf("(%s/%s).Rules[%d]", ing.Namespace, ing.Name, i)
//fmt.Printf("In func translateKnativeIngressV1alpha1(): ruleName = %s", ruleName)
//routes := make([]*route.Route, 0, len(rule.HTTP.Paths))
for j, httpPath := range rule.HTTP.Paths {
// TODO: no header matcher since neither APISIX Route nor APISIX Plugin (such as traffic-split) supports header matching
// Note the N:1 mapping from rule.HTTP.Paths to httpPath and from httpPath.Splits to split
// Default the path to "/" if none is passed.
path := httpPath.Path
if path == "" {
path = "/"
}
headers := make(map[string]string)
for key, value := range httpPath.AppendHeaders {
headers[key] = value
}
var (
ups *apisixv1.Upstream
err error
upstreams []*apisixv1.Upstream
percents []int // Caution: elements in upstreams and percents are corresponding by index
)
knativeBackend := knativeSelectSplit(httpPath.Splits)
serviceName := knativeBackend.IngressBackend.ServiceName
servicePort := knativeBackend.IngressBackend.ServicePort
for _, split := range httpPath.Splits {
//split := knativeSelectSplit(httpPath.Splits)
// The FQN of the service is sufficient here, as clusters towards the
// same service are supposed to be deduplicated anyway.
//splitName := fmt.Sprintf("%s/%s", split.ServiceNamespace, split.ServiceName)
ingressBackend := split.IngressBackend

if serviceName != "" {
ups, err = t.translateUpstreamFromKnativeIngressV1alpha1(ing.Namespace, serviceName, servicePort)
if err != nil {
log.Errorw("failed to translate knative ingress backend to upstream",
zap.Error(err),
zap.Any("knative ingress", ing),
)
return nil, err
if ingressBackend.ServiceName != "" {
upstream, err := t.translateUpstreamFromKnativeIngressV1alpha1(ingressBackend.ServiceNamespace, ingressBackend.ServiceName, ingressBackend.ServicePort)
if err != nil {
log.Errorw("failed to translate knative ingress backend to upstream",
zap.Error(err),
zap.Any("knative ingress", ing),
zap.Any("split", split),
)
return nil, err
}
upstreams = append(upstreams, upstream)
percents = append(percents, split.Percent)
ctx.addUpstream(upstream)
}
// TODO: Current APISIX Route model and Plugins do not support AppendHeaders after traffic split
// Knative requires two phase of AppendHeaders, first for httpPath, second for each split.
// httpPath -> AppendHeaders -> Split1 -> AppendHeaders
// \
// > Split2 -> AppendHeaders
// Now appends headers from all splits.
for key, value := range split.AppendHeaders {
headers[key] = value
}
ctx.addUpstream(ups)
}
route := apisixv1.NewDefaultRoute()
// TODO: Figure out a way to name the routes (See Kong ingress controller #834)
route.Name = composeKnativeIngressRouteName(ing.Namespace, ing.Name, i, j)
route.ID = id.GenID(route.Name)
route.Hosts = hosts
uris := []string{httpPath.Path}
// httpPath.Path represents a literal prefix to which this rule should apply.
// As per the specification of Ingress path matching rule:
Expand All @@ -83,34 +148,43 @@ func (t *translator) translateKnativeIngressV1alpha1(ing *knativev1alpha1.Ingres
prefix += "/*"
}
uris = append(uris, prefix)

route := apisixv1.NewDefaultRoute()
// TODO: Figure out a way to name the routes (See Kong ingress controller #834)
route.Name = composeKnativeIngressRouteName(ing.Namespace, ing.Name, i, j)
route.ID = id.GenID(route.Name)
route.Hosts = hosts
route.Uris = uris
route.EnableWebsocket = true
if !ruleExtVisibility {
// host and hosts, remote_addr and remote_addrs cannot exist at the same time, only one of them can be selected.
// If enabled at the same time, the API will respond with an error.
//route.RemoteAddrs = []string{"10.96.0.0/16"}
}

// add APISIX plugin "proxy-rewrite" to support KIngress' `appendHeaders` property
var proxyRewritePlugin apisixv1.RewriteConfig
headers := make(map[string]string)
for key, value := range knativeBackend.AppendHeaders {
headers[key] = value
}
for key, value := range httpPath.AppendHeaders {
headers[key] = value
}
if len(headers) > 0 {
if len(headers) > 0 || httpPath.RewriteHost != "" {
proxyRewritePlugin.RewriteHeaders = headers
proxyRewritePlugin.RewriteHost = httpPath.RewriteHost
plugins["proxy-rewrite"] = proxyRewritePlugin
}

if len(upstreams) > 0 {
route.UpstreamId = upstreams[0].ID
}
var trafficSplitPlugin apisixv1.TrafficSplitConfig
if len(upstreams) > 1 && len(upstreams) == len(percents) {
var trafficSplitConfigRule apisixv1.TrafficSplitConfigRule
var weightedUpstream apisixv1.TrafficSplitConfigRuleWeightedUpstream
weightedUpstream.Weight = percents[0]
trafficSplitConfigRule.WeightedUpstreams = append(trafficSplitConfigRule.WeightedUpstreams, *weightedUpstream.DeepCopy())
for i := 1; i < len(upstreams); i++ {
weightedUpstream.UpstreamID = upstreams[i].ID
weightedUpstream.Weight = percents[i] // won't panic since upstreams and percents are of equal length
trafficSplitConfigRule.WeightedUpstreams = append(trafficSplitConfigRule.WeightedUpstreams, *weightedUpstream.DeepCopy())
}
trafficSplitPlugin.Rules = append(trafficSplitPlugin.Rules, trafficSplitConfigRule)
plugins["traffic-split"] = trafficSplitPlugin
}

if len(plugins) > 0 {
route.Plugins = *(plugins.DeepCopy())
}
if ups != nil {
route.UpstreamId = ups.ID
}
ctx.addRoute(route)
}
}
Expand All @@ -122,6 +196,10 @@ func (t *translator) translateUpstreamFromKnativeIngressV1alpha1(namespace strin
if svcPort.Type == intstr.String {
svc, err := t.ServiceLister.Services(namespace).Get(svcName)
if err != nil {
log.Errorf("In translateUpstreamFromKnativeIngressV1alpha1(): service not found",
zap.String("namespace", namespace),
zap.String("svcName", svcName),
zap.Any("svcPort", svcPort))
return nil, err
}
for _, port := range svc.Spec.Ports {
Expand All @@ -148,6 +226,36 @@ func (t *translator) translateUpstreamFromKnativeIngressV1alpha1(namespace strin
return ups, nil
}

func (t *translator) translateTLSFromKnativeIngressV1alpha1(tls knativev1alpha1.IngressTLS) (*apisixv1.Ssl, error) {
s, err := t.SecretLister.Secrets(tls.SecretNamespace).Get(tls.SecretName)
if err != nil {
return nil, err
}
cert, ok := s.Data["cert"]
if !ok {
return nil, ErrEmptyCert
}
key, ok := s.Data["key"]
if !ok {
return nil, ErrEmptyPrivKey
}
var snis []string
for _, host := range tls.Hosts {
snis = append(snis, host)
}
ssl := &apisixv1.Ssl{
ID: id.GenID(tls.SecretNamespace + "_" + tls.SecretName),
Snis: snis,
Cert: string(cert),
Key: string(key),
Status: 1,
Labels: map[string]string{
"managed-by": "apisix-ingress-controller",
},
}
return ssl, nil
}

func knativeSelectSplit(splits []knativev1alpha1.IngressBackendSplit) knativev1alpha1.IngressBackendSplit {
if len(splits) == 0 {
return knativev1alpha1.IngressBackendSplit{}
Expand Down

0 comments on commit df3b1a2

Please sign in to comment.