Skip to content

Commit

Permalink
feat: sync CRD and ingress resource to APISIX mechanism. (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlinsRan committed Jun 22, 2022
1 parent 50d6026 commit 6394cdd
Show file tree
Hide file tree
Showing 17 changed files with 528 additions and 46 deletions.
1 change: 1 addition & 0 deletions cmd/ingress/ingress.go
Expand Up @@ -171,6 +171,7 @@ For example, no available LB exists in the bare metal environment.`)
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, "default-apisix-cluster-admin-key", "", "admin key used for the authorization of admin api / manager api for the default APISIX cluster")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, "default-apisix-cluster-name", "default", "name of the default apisix cluster")
cmd.PersistentFlags().DurationVar(&cfg.ApisixResourceSyncInterval.Duration, "apisix-resource-sync-interval", 300*time.Second, "interval between syncs in seconds. Default value is 300s.")

if err := cmd.PersistentFlags().MarkDeprecated("app-namespace", "use namespace-selector instead"); err != nil {
dief("failed to mark `app-namespace` as deprecated: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion conf/config-default.yaml
Expand Up @@ -44,7 +44,7 @@ ingress_status_address: [] # when there is no available information on the Ser
# For example, no available LB exists in the bare metal environment.
enable_profiling: true # enable profiling via web interfaces
# host:port/debug/pprof, default is true.

apisix-resource-sync-interval: "300s" # Default interval for synchronizing Kubernetes resources to APISIX
# Kubernetes related configurations.
kubernetes:
kubeconfig: "" # the Kubernetes configuration file path, default is
Expand Down
42 changes: 22 additions & 20 deletions pkg/config/config.go
Expand Up @@ -72,17 +72,18 @@ const (
// Config contains all config items which are necessary for
// apisix-ingress-controller's running.
type Config struct {
CertFilePath string `json:"cert_file" yaml:"cert_file"`
KeyFilePath string `json:"key_file" yaml:"key_file"`
LogLevel string `json:"log_level" yaml:"log_level"`
LogOutput string `json:"log_output" yaml:"log_output"`
HTTPListen string `json:"http_listen" yaml:"http_listen"`
HTTPSListen string `json:"https_listen" yaml:"https_listen"`
IngressPublishService string `json:"ingress_publish_service" yaml:"ingress_publish_service"`
IngressStatusAddress []string `json:"ingress_status_address" yaml:"ingress_status_address"`
EnableProfiling bool `json:"enable_profiling" yaml:"enable_profiling"`
Kubernetes KubernetesConfig `json:"kubernetes" yaml:"kubernetes"`
APISIX APISIXConfig `json:"apisix" yaml:"apisix"`
CertFilePath string `json:"cert_file" yaml:"cert_file"`
KeyFilePath string `json:"key_file" yaml:"key_file"`
LogLevel string `json:"log_level" yaml:"log_level"`
LogOutput string `json:"log_output" yaml:"log_output"`
HTTPListen string `json:"http_listen" yaml:"http_listen"`
HTTPSListen string `json:"https_listen" yaml:"https_listen"`
IngressPublishService string `json:"ingress_publish_service" yaml:"ingress_publish_service"`
IngressStatusAddress []string `json:"ingress_status_address" yaml:"ingress_status_address"`
EnableProfiling bool `json:"enable_profiling" yaml:"enable_profiling"`
Kubernetes KubernetesConfig `json:"kubernetes" yaml:"kubernetes"`
APISIX APISIXConfig `json:"apisix" yaml:"apisix"`
ApisixResourceSyncInterval types.TimeDuration `json:"apisix-resource-sync-interval" yaml:"apisix-resource-sync-interval"`
}

// KubernetesConfig contains all Kubernetes related config items.
Expand Down Expand Up @@ -118,15 +119,16 @@ type APISIXConfig struct {
// default value.
func NewDefaultConfig() *Config {
return &Config{
LogLevel: "warn",
LogOutput: "stderr",
HTTPListen: ":8080",
HTTPSListen: ":8443",
IngressPublishService: "",
IngressStatusAddress: []string{},
CertFilePath: "/etc/webhook/certs/cert.pem",
KeyFilePath: "/etc/webhook/certs/key.pem",
EnableProfiling: true,
LogLevel: "warn",
LogOutput: "stderr",
HTTPListen: ":8080",
HTTPSListen: ":8443",
IngressPublishService: "",
IngressStatusAddress: []string{},
CertFilePath: "/etc/webhook/certs/cert.pem",
KeyFilePath: "/etc/webhook/certs/key.pem",
EnableProfiling: true,
ApisixResourceSyncInterval: types.TimeDuration{Duration: 300 * time.Second},
Kubernetes: KubernetesConfig{
Kubeconfig: "", // Use in-cluster configurations.
ResyncInterval: types.TimeDuration{Duration: 6 * time.Hour},
Expand Down
41 changes: 23 additions & 18 deletions pkg/config/config_test.go
Expand Up @@ -28,15 +28,16 @@ import (

func TestNewConfigFromFile(t *testing.T) {
cfg := &Config{
LogLevel: "warn",
LogOutput: "stdout",
HTTPListen: ":9090",
HTTPSListen: ":9443",
IngressPublishService: "",
IngressStatusAddress: []string{},
CertFilePath: "/etc/webhook/certs/cert.pem",
KeyFilePath: "/etc/webhook/certs/key.pem",
EnableProfiling: true,
LogLevel: "warn",
LogOutput: "stdout",
HTTPListen: ":9090",
HTTPSListen: ":9443",
IngressPublishService: "",
IngressStatusAddress: []string{},
CertFilePath: "/etc/webhook/certs/cert.pem",
KeyFilePath: "/etc/webhook/certs/key.pem",
EnableProfiling: true,
ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 * time.Second},
Kubernetes: KubernetesConfig{
ResyncInterval: types.TimeDuration{Duration: time.Hour},
Kubeconfig: "/path/to/foo/baz",
Expand Down Expand Up @@ -86,6 +87,7 @@ https_listen: :9443
ingress_publish_service: ""
ingress_status_address: []
enable_profiling: true
apisix-resource-sync-interval: 200s
kubernetes:
kubeconfig: /path/to/foo/baz
resync_interval: 1h0m0s
Expand Down Expand Up @@ -113,15 +115,16 @@ apisix:

func TestConfigWithEnvVar(t *testing.T) {
cfg := &Config{
LogLevel: "warn",
LogOutput: "stdout",
HTTPListen: ":9090",
HTTPSListen: ":9443",
IngressPublishService: "",
IngressStatusAddress: []string{},
CertFilePath: "/etc/webhook/certs/cert.pem",
KeyFilePath: "/etc/webhook/certs/key.pem",
EnableProfiling: true,
LogLevel: "warn",
LogOutput: "stdout",
HTTPListen: ":9090",
HTTPSListen: ":9443",
IngressPublishService: "",
IngressStatusAddress: []string{},
CertFilePath: "/etc/webhook/certs/cert.pem",
KeyFilePath: "/etc/webhook/certs/key.pem",
EnableProfiling: true,
ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 * time.Second},
Kubernetes: KubernetesConfig{
ResyncInterval: types.TimeDuration{Duration: time.Hour},
Kubeconfig: "",
Expand Down Expand Up @@ -160,6 +163,7 @@ func TestConfigWithEnvVar(t *testing.T) {
"ingress_publish_service": "",
"ingress_status_address": [],
"enable_profiling": true,
"apisix-resource-sync-interval": "200s",
"kubernetes": {
"kubeconfig": "{{.KUBECONFIG}}",
"resync_interval": "1h0m0s",
Expand Down Expand Up @@ -195,6 +199,7 @@ https_listen: :9443
ingress_publish_service: ""
ingress_status_address: []
enable_profiling: true
apisix-resource-sync-interval: 200s
kubernetes:
resync_interval: 1h0m0s
kubeconfig: "{{.KUBECONFIG}}"
Expand Down
26 changes: 26 additions & 0 deletions pkg/ingress/apisix_cluster_config.go
Expand Up @@ -403,3 +403,29 @@ func (c *apisixClusterConfigController) onDelete(obj interface{}) {

c.controller.MetricsCollector.IncrEvents("clusterConfig", "delete")
}

func (c *apisixClusterConfigController) ResourceSync() {
objs := c.controller.apisixClusterConfigInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorw("ApisixClusterConfig sync failed, found ApisixClusterConfig resource with bad meta namespace key", zap.String("error", err.Error()))
continue
}
if !c.controller.isWatchingNamespace(key) {
continue
}
acc, err := kube.NewApisixClusterConfig(obj)
if err != nil {
log.Errorw("found ApisixClusterConfig resource with bad type", zap.String("error", err.Error()))
return
}
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixClusterConfigEvent{
Key: key,
GroupVersion: acc.GroupVersion(),
},
})
}
}
26 changes: 26 additions & 0 deletions pkg/ingress/apisix_consumer.go
Expand Up @@ -318,3 +318,29 @@ func (c *apisixConsumerController) onDelete(obj interface{}) {

c.controller.MetricsCollector.IncrEvents("consumer", "delete")
}

func (c *apisixConsumerController) ResourceSync() {
objs := c.controller.apisixConsumerInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorw("ApisixConsumer sync failed, found ApisixConsumer resource with bad meta namespace key", zap.String("error", err.Error()))
continue
}
if !c.controller.isWatchingNamespace(key) {
continue
}
ac, err := kube.NewApisixConsumer(obj)
if err != nil {
log.Errorw("found ApisixConsumer resource with bad type", zap.String("error", err.Error()))
return
}
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixConsumerEvent{
Key: key,
GroupVersion: ac.GroupVersion(),
},
})
}
}
22 changes: 22 additions & 0 deletions pkg/ingress/apisix_pluginconfig.go
Expand Up @@ -367,3 +367,25 @@ func (c *apisixPluginConfigController) onDelete(obj interface{}) {

c.controller.MetricsCollector.IncrEvents("PluginConfig", "delete")
}

func (c *apisixPluginConfigController) ResourceSync() {
objs := c.controller.apisixPluginConfigInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorw("ApisixPluginConfig sync failed, found ApisixPluginConfig resource with bad meta namespace key", zap.String("error", err.Error()))
continue
}
if !c.controller.isWatchingNamespace(key) {
continue
}
apc := kube.MustNewApisixPluginConfig(obj)
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixPluginConfigEvent{
Key: key,
GroupVersion: apc.GroupVersion(),
},
})
}
}
22 changes: 22 additions & 0 deletions pkg/ingress/apisix_route.go
Expand Up @@ -449,3 +449,25 @@ func (c *apisixRouteController) onDelete(obj interface{}) {

c.controller.MetricsCollector.IncrEvents("route", "delete")
}

func (c *apisixRouteController) ResourceSync() {
objs := c.controller.apisixRouteInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorw("ApisixRoute sync failed, found ApisixRoute resource with bad meta namespace key", zap.String("error", err.Error()))
continue
}
if !c.controller.isWatchingNamespace(key) {
continue
}
ar := kube.MustNewApisixRoute(obj)
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixRouteEvent{
Key: key,
GroupVersion: ar.GroupVersion(),
},
})
}
}
26 changes: 26 additions & 0 deletions pkg/ingress/apisix_tls.go
Expand Up @@ -359,3 +359,29 @@ func (c *apisixTlsController) onDelete(obj interface{}) {

c.controller.MetricsCollector.IncrEvents("TLS", "delete")
}

func (c *apisixTlsController) ResourceSync() {
objs := c.controller.apisixTlsInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorw("ApisixTls sync failed, found ApisixTls object with bad namespace/name ignore it", zap.String("error", err.Error()))
continue
}
if !c.controller.isWatchingNamespace(key) {
continue
}
tls, err := kube.NewApisixTls(obj)
if err != nil {
log.Errorw("ApisixTls sync failed, found ApisixTls resource with bad type", zap.Error(err))
continue
}
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixTlsEvent{
Key: key,
GroupVersion: tls.GroupVersion(),
},
})
}
}
18 changes: 18 additions & 0 deletions pkg/ingress/apisix_upstream.go
Expand Up @@ -301,3 +301,21 @@ func (c *apisixUpstreamController) onDelete(obj interface{}) {

c.controller.MetricsCollector.IncrEvents("upstream", "delete")
}

func (c *apisixUpstreamController) ResourceSync() {
clusterConfigs := c.controller.apisixUpstreamInformer.GetIndexer().List()
for _, clusterConfig := range clusterConfigs {
key, err := cache.MetaNamespaceKeyFunc(clusterConfig)
if err != nil {
log.Errorw("ApisixUpstream sync failed, found ApisixUpstream resource with bad meta namespace key", zap.String("error", err.Error()))
continue
}
if !c.controller.isWatchingNamespace(key) {
continue
}
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
}
}
60 changes: 59 additions & 1 deletion pkg/ingress/controller.go
Expand Up @@ -64,6 +64,8 @@ const (
_resourceSyncAborted = "ResourceSyncAborted"
// _messageResourceFailed is used to report error
_messageResourceFailed = "%s synced failed, with error: %s"
// minimum interval for ingress sync to APISIX
_mininumApisixResourceSyncInterval = 60 * time.Second
)

// Controller is the ingress apisix controller object.
Expand Down Expand Up @@ -399,7 +401,6 @@ func (c *Controller) Run(stop chan struct{}) error {
ReleaseOnCancel: true,
Name: "ingress-apisix",
}

elector, err := leaderelection.NewLeaderElector(cfg)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
Expand Down Expand Up @@ -569,6 +570,9 @@ func (c *Controller) run(ctx context.Context) {
c.apisixPluginConfigController.run(ctx)
})

c.goAttach(func() {
c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)
})
c.MetricsCollector.ResetLeader(true)

log.Infow("controller now is running as leader",
Expand Down Expand Up @@ -727,3 +731,57 @@ func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.
c.MetricsCollector.IncrCheckClusterHealth(c.name)
}
}

func (c *Controller) syncAllResources() {
wg := sync.WaitGroup{}
goAttach := func(handler func()) {
wg.Add(1)
go func() {
defer wg.Done()
handler()
}()
}
goAttach(func() {
c.apisixConsumerController.ResourceSync()
})
goAttach(func() {
c.apisixRouteController.ResourceSync()
})
goAttach(func() {
c.apisixClusterConfigController.ResourceSync()
})
goAttach(func() {
c.apisixPluginConfigController.ResourceSync()
})
goAttach(func() {
c.apisixUpstreamController.ResourceSync()
})
goAttach(func() {
c.apisixTlsController.ResourceSync()
})
goAttach(func() {
c.ingressController.ResourceSync()
})
wg.Wait()
}

func (c *Controller) resourceSyncLoop(ctx context.Context, interval time.Duration) {
// The interval shall not be less than 60 seconds.
if interval < _mininumApisixResourceSyncInterval {
log.Warnw("The apisix-resource-sync-interval shall not be less than 60 seconds.",
zap.String("apisix-resource-sync-interval", interval.String()),
)
interval = _mininumApisixResourceSyncInterval
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.syncAllResources()
continue
case <-ctx.Done():
return
}
}
}

0 comments on commit 6394cdd

Please sign in to comment.