Skip to content

Commit

Permalink
feat: support secret controller (#284)
Browse files Browse the repository at this point in the history
* feat: add secret controller

* fix: e2e test describe

* fix: add  license headers

* fix: add key for types.Event

* fix: e2e failed

* fix: use key instead of object

* fix: bug when delete
  • Loading branch information
gxthrj committed Mar 8, 2021
1 parent 8eea389 commit 4262138
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 294 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ module github.com/apache/apisix-ingress-controller
go 1.13

require (
github.com/gavv/httpexpect/v2 v2.2.0 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/gruntwork-io/terratest v0.32.8 // indirect
github.com/hashicorp/go-memdb v1.0.4
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.11 // indirect
Expand Down
287 changes: 0 additions & 287 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/apisix/ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
log.Debugw("try to delete ssl",
zap.String("id", obj.ID),
zap.String("cluster", s.clusterName),
zap.String("fullName", obj.FullName),
zap.String("url", s.url),
)
if err := s.cluster.HasSynced(ctx); err != nil {
Expand Down
39 changes: 38 additions & 1 deletion pkg/ingress/controller/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,6 +35,13 @@ import (
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/seven/state"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

var (
// the struct of secretSSLMap is a map[secretKey string]map[sslKey string]bool
// the xxxKey is format as namespace + "/" + name
secretSSLMap = sync.Map{}
)

type ApisixTLSController struct {
Expand Down Expand Up @@ -153,8 +161,37 @@ func (c *ApisixTLSController) syncHandler(tqo *TlsQueueObj) error {
// sync to apisix
log.Debug(tls)
log.Debug(tqo)
return state.SyncSsl(tls, tqo.Ope)
err = state.SyncSsl(tls, tqo.Ope)
// sync SyncSecretSSL
secretKey := fmt.Sprintf("%s_%s", apisixTls.Spec.Secret.Namespace, apisixTls.Spec.Secret.Name)
SyncSecretSSL(secretKey, tls, tqo.Ope)
return err
}
}

// SyncSecretSSL sync the secretSSLMap
// the struct of secretSSLMap is a map[secretKey string]map[sslKey string]bool
// the xxxKey is format as namespace + "_" + name
func SyncSecretSSL(key string, ssl *v1.Ssl, operator string) {
ssls, ok := secretSSLMap.Load(key)
if ok {
sslMap := ssls.(sync.Map)
switch operator {
case state.Delete:
sslMap.Delete(ssl.ID)
secretSSLMap.Store(key, sslMap)
default:
sslMap.Store(ssl.ID, ssl)
secretSSLMap.Store(key, sslMap)
}
} else {
if operator != state.Delete {
sslMap := sync.Map{}
sslMap.Store(ssl.ID, ssl)
secretSSLMap.Store(key, sslMap)
}
}

}

func (c *ApisixTLSController) addFunc(obj interface{}) {
Expand Down
17 changes: 15 additions & 2 deletions pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,18 @@ type Controller struct {
svcLister listerscorev1.ServiceLister
ingressLister kube.IngressLister
ingressInformer cache.SharedIndexInformer
secretInformer cache.SharedIndexInformer
secretLister listerscorev1.SecretLister
apisixUpstreamInformer cache.SharedIndexInformer
apisixUpstreamLister listersv1.ApisixUpstreamLister
apisixRouteLister kube.ApisixRouteLister
apisixRouteInformer cache.SharedIndexInformer

// resource controllers
endpointsController *endpointsController
ingressController *ingressController
endpointsController *endpointsController
ingressController *ingressController
secretController *secretController

apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
}
Expand Down Expand Up @@ -159,6 +163,8 @@ func NewController(cfg *config.Config) (*Controller, error) {
svcLister: kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
ingressLister: ingressLister,
ingressInformer: ingressInformer,
secretInformer: kube.CoreSharedInformerFactory.Core().V1().Secrets().Informer(),
secretLister: kube.CoreSharedInformerFactory.Core().V1().Secrets().Lister(),
apisixRouteInformer: apisixRouteInformer,
apisixRouteLister: apisixRouteLister,
apisixUpstreamInformer: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
Expand All @@ -174,6 +180,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
c.apisixUpstreamController = c.newApisixUpstreamController()
c.apisixRouteController = c.newApisixRouteController()
c.ingressController = c.newIngressController()
c.secretController = c.newSecretController()

return c, nil
}
Expand Down Expand Up @@ -307,6 +314,12 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.apisixRouteController.run(ctx)
})
c.goAttach(func() {
c.secretInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.secretController.run(ctx)
})

ac := &Api6Controller{
KubeClientSet: c.clientset,
Expand Down
226 changes: 226 additions & 0 deletions pkg/ingress/controller/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/seven/state"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

type secretController struct {
controller *Controller
workqueue workqueue.RateLimitingInterface
workers int
}

func (c *Controller) newSecretController() *secretController {
ctl := &secretController{
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Secrets"),
workers: 1,
}

ctl.controller.secretInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
DeleteFunc: ctl.onDelete,
},
)

return ctl
}

func (c *secretController) run(ctx context.Context) {
log.Info("secret controller started")
defer log.Info("secret controller exited")

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.secretInformer.HasSynced); !ok {
log.Error("informers sync failed")
return
}

handler := func() {
for {
obj, shutdown := c.workqueue.Get()
if shutdown {
return
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
event := obj.(*types.Event)
if key, ok := event.Object.(string); !ok {
c.workqueue.Forget(obj)
return fmt.Errorf("expected Secret in workqueue but got %#v", obj)
} else {
if err := c.sync(ctx, event); err != nil {
c.workqueue.AddRateLimited(obj)
log.Errorf("sync secret with ssl %s failed", key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
c.workqueue.Forget(obj)
return nil
}
}(obj)
if err != nil {
runtime.HandleError(err)
}
}
}

for i := 0; i < c.workers; i++ {
go handler()
}

<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
key := ev.Object.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorf("invalid resource key: %s", key)
return err
}
sec, err := c.controller.secretLister.Secrets(namespace).Get(name)

secretMapkey := namespace + "_" + name
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get Secret",
zap.String("key", secretMapkey),
zap.Error(err),
)
return err
}

if ev.Type != types.EventDelete {
log.Warnw("Secret was deleted before it can be delivered",
zap.String("key", secretMapkey),
)
return nil
}
}
if ev.Type == types.EventDelete {
if sec != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnw("discard the stale secret delete event since the resource still exists",
zap.String("key", secretMapkey),
)
return nil
}
sec = ev.Tombstone.(*corev1.Secret)
}
// sync SSL in APISIX which is store in secretSSLMap
// FixMe Need to update the status of CRD ApisixTls
ssls, ok := secretSSLMap.Load(secretMapkey)
if ok {
sslMap := ssls.(sync.Map)
sslMap.Range(func(_, v interface{}) bool {
ssl := v.(*apisixv1.Ssl)
ssl.FullName = ssl.ID
err = state.SyncSsl(ssl, ev.Type.String())
if err != nil {
return false
}
return true
})
}
return err
}

func (c *secretController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found secret object with bad namespace/name: %s, ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}

c.workqueue.AddRateLimited(&types.Event{
Type: types.EventAdd,
Object: key,
})
}

func (c *secretController) onUpdate(prev, curr interface{}) {
prevSec := prev.(*corev1.Secret)
currSec := curr.(*corev1.Secret)

if prevSec.GetResourceVersion() == currSec.GetResourceVersion() {
return
}
key, err := cache.MetaNamespaceKeyFunc(currSec)
if err != nil {
log.Errorf("found secrets object with bad namespace/name: %s, ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
c.workqueue.AddRateLimited(&types.Event{
Type: types.EventUpdate,
Object: key,
})
}

func (c *secretController) onDelete(obj interface{}) {
sec, ok := obj.(*corev1.Secret)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("found secrets: %+v in bad tombstone state", obj)
return
}
sec = tombstone.Obj.(*corev1.Secret)
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found secret resource with bad meta namesapce key: %s", err)
return
}
// FIXME Refactor Controller.namespaceWatching to just use
// namespace after all controllers use the same way to fetch
// the object.
if !c.controller.namespaceWatching(key) {
return
}
c.workqueue.AddRateLimited(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: sec,
})
}
2 changes: 0 additions & 2 deletions pkg/seven/state/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,9 @@ func SyncSsl(ssl *v1.Ssl, method string) error {
_, err := conf.Client.Cluster(cluster).SSL().Create(context.TODO(), ssl)
return err
case Update:
// FIXME we don't know the full name of SSL.
_, err := conf.Client.Cluster(cluster).SSL().Update(context.TODO(), ssl)
return err
case Delete:
// FIXME we don't know the full name of SSL.
return conf.Client.Cluster(cluster).SSL().Delete(context.TODO(), ssl)
}
return nil
Expand Down

0 comments on commit 4262138

Please sign in to comment.