From 7c0073d69cf23da609e3a25ff63c14b9ef0a95d7 Mon Sep 17 00:00:00 2001 From: nevercase <1024769485@qq.com> Date: Wed, 8 Dec 2021 16:07:28 +0800 Subject: [PATCH] feat: implement pluginconfig clients (#638) (#772) --- pkg/api/validation/apisix_route_test.go | 4 + pkg/apisix/apisix.go | 11 + pkg/apisix/cache/cache.go | 8 + pkg/apisix/cache/memdb.go | 28 +++ pkg/apisix/cache/memdb_test.go | 55 +++++ pkg/apisix/cache/schema.go | 16 ++ pkg/apisix/cluster.go | 2 + pkg/apisix/nonexistentclient.go | 94 +++++--- pkg/apisix/pluginconfig.go | 235 +++++++++++++++++++ pkg/apisix/pluginconfig_test.go | 219 +++++++++++++++++ pkg/apisix/resource.go | 10 + pkg/apisix/schema.go | 5 + pkg/types/apisix/v1/types.go | 7 + pkg/types/apisix/v1/zz_generated.deepcopy.go | 18 ++ 14 files changed, 682 insertions(+), 30 deletions(-) create mode 100644 pkg/apisix/pluginconfig.go create mode 100644 pkg/apisix/pluginconfig_test.go diff --git a/pkg/api/validation/apisix_route_test.go b/pkg/api/validation/apisix_route_test.go index f56e6bcf65b..9af3b78346d 100644 --- a/pkg/api/validation/apisix_route_test.go +++ b/pkg/api/validation/apisix_route_test.go @@ -57,6 +57,10 @@ func (c fakeSchemaClient) GetSslSchema(_ context.Context) (*api.Schema, error) { return nil, nil } +func (c fakeSchemaClient) GetPluginConfigSchema(_ context.Context) (*api.Schema, error) { + return nil, nil +} + func newFakeSchemaClient() apisix.Schema { testData := map[string]string{ "api-breaker": `{"required":["break_response_code"],"$comment":"this is a mark for our injected plugin schema","type":"object","properties":{"healthy":{"properties":{"successes":{"minimum":1,"type":"integer","default":3},"http_statuses":{"items":{"minimum":200,"type":"integer","maximum":499},"uniqueItems":true,"type":"array","minItems":1,"default":[200]}},"type":"object","default":{"successes":3,"http_statuses":[200]}},"break_response_code":{"minimum":200,"type":"integer","maximum":599},"max_breaker_sec":{"minimum":3,"type":"integer","default":300},"unhealthy":{"properties":{"failures":{"minimum":1,"type":"integer","default":3},"http_statuses":{"items":{"minimum":500,"type":"integer","maximum":599},"uniqueItems":true,"type":"array","minItems":1,"default":[500]}},"type":"object","default":{"failures":3,"http_statuses":[500]}},"disable":{"type":"boolean"}}}`, diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go index fe89c8656fd..617af3c0151 100644 --- a/pkg/apisix/apisix.go +++ b/pkg/apisix/apisix.go @@ -133,6 +133,17 @@ type Schema interface { GetUpstreamSchema(context.Context) (*v1.Schema, error) GetConsumerSchema(context.Context) (*v1.Schema, error) GetSslSchema(context.Context) (*v1.Schema, error) + GetPluginConfigSchema(ctx context.Context) (*v1.Schema, error) +} + +// PluginConfig is the specific client interface to take over the create, update, +// list and delete for APISIX PluginConfig resource. +type PluginConfig interface { + Get(context.Context, string) (*v1.PluginConfig, error) + List(context.Context) ([]*v1.PluginConfig, error) + Create(context.Context, *v1.PluginConfig) (*v1.PluginConfig, error) + Delete(context.Context, *v1.PluginConfig) error + Update(context.Context, *v1.PluginConfig) (*v1.PluginConfig, error) } type apisix struct { diff --git a/pkg/apisix/cache/cache.go b/pkg/apisix/cache/cache.go index e14857fed10..debae3eea47 100644 --- a/pkg/apisix/cache/cache.go +++ b/pkg/apisix/cache/cache.go @@ -37,6 +37,8 @@ type Cache interface { InsertConsumer(*v1.Consumer) error // InsertSchema adds or updates schema to cache. InsertSchema(*v1.Schema) error + // InsertPluginConfig adds or updates plugin_config to cache. + InsertPluginConfig(*v1.PluginConfig) error // GetRoute finds the route from cache according to the primary index (id). GetRoute(string) (*v1.Route, error) @@ -52,6 +54,8 @@ type Cache interface { GetConsumer(string) (*v1.Consumer, error) // GetSchema finds the scheme from cache according to the primary index (id). GetSchema(string) (*v1.Schema, error) + // GetPluginConfig finds the plugin_config from cache according to the primary index (id). + GetPluginConfig(string) (*v1.PluginConfig, error) // ListRoutes lists all routes in cache. ListRoutes() ([]*v1.Route, error) @@ -67,6 +71,8 @@ type Cache interface { ListConsumers() ([]*v1.Consumer, error) // ListSchema lists all schema in cache. ListSchema() ([]*v1.Schema, error) + // ListPluginConfig lists all plugin_config in cache. + ListPluginConfigs() ([]*v1.PluginConfig, error) // DeleteRoute deletes the specified route in cache. DeleteRoute(*v1.Route) error @@ -82,4 +88,6 @@ type Cache interface { DeleteConsumer(*v1.Consumer) error // DeleteSchema deletes the specified schema in cache. DeleteSchema(*v1.Schema) error + // DeletePluginConfig deletes the specified plugin_config in cache. + DeletePluginConfig(*v1.PluginConfig) error } diff --git a/pkg/apisix/cache/memdb.go b/pkg/apisix/cache/memdb.go index 17f9cd38fa6..4557d13cc69 100644 --- a/pkg/apisix/cache/memdb.go +++ b/pkg/apisix/cache/memdb.go @@ -74,6 +74,10 @@ func (c *dbCache) InsertSchema(schema *v1.Schema) error { return c.insert("schema", schema.DeepCopy()) } +func (c *dbCache) InsertPluginConfig(pc *v1.PluginConfig) error { + return c.insert("plugin_config", pc.DeepCopy()) +} + func (c *dbCache) insert(table string, obj interface{}) error { txn := c.db.Txn(true) defer txn.Abort() @@ -140,6 +144,14 @@ func (c *dbCache) GetSchema(name string) (*v1.Schema, error) { return obj.(*v1.Schema).DeepCopy(), nil } +func (c *dbCache) GetPluginConfig(name string) (*v1.PluginConfig, error) { + obj, err := c.get("plugin_config", name) + if err != nil { + return nil, err + } + return obj.(*v1.PluginConfig).DeepCopy(), nil +} + func (c *dbCache) get(table, id string) (interface{}, error) { txn := c.db.Txn(false) defer txn.Abort() @@ -240,6 +252,18 @@ func (c *dbCache) ListSchema() ([]*v1.Schema, error) { return schemaList, nil } +func (c *dbCache) ListPluginConfigs() ([]*v1.PluginConfig, error) { + raws, err := c.list("plugin_config") + if err != nil { + return nil, err + } + pluginConfigs := make([]*v1.PluginConfig, 0, len(raws)) + for _, raw := range raws { + pluginConfigs = append(pluginConfigs, raw.(*v1.PluginConfig).DeepCopy()) + } + return pluginConfigs, nil +} + func (c *dbCache) list(table string) ([]interface{}, error) { txn := c.db.Txn(false) defer txn.Abort() @@ -285,6 +309,10 @@ func (c *dbCache) DeleteSchema(schema *v1.Schema) error { return c.delete("schema", schema) } +func (c *dbCache) DeletePluginConfig(pc *v1.PluginConfig) error { + return c.delete("plugin_config", pc) +} + func (c *dbCache) delete(table string, obj interface{}) error { txn := c.db.Txn(true) defer txn.Abort() diff --git a/pkg/apisix/cache/memdb_test.go b/pkg/apisix/cache/memdb_test.go index 3e8494c7136..25b731fb0c9 100644 --- a/pkg/apisix/cache/memdb_test.go +++ b/pkg/apisix/cache/memdb_test.go @@ -389,3 +389,58 @@ func TestMemDBCacheSchema(t *testing.T) { } assert.Error(t, ErrNotFound, c.DeleteSchema(s4)) } + +func TestMemDBCachePluginConfig(t *testing.T) { + c, err := NewMemDBCache() + assert.Nil(t, err, "NewMemDBCache") + + pc1 := &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "1", + Name: "name1", + }, + } + assert.Nil(t, c.InsertPluginConfig(pc1), "inserting plugin_config pc1") + + pc11, err := c.GetPluginConfig("1") + assert.Nil(t, err) + assert.Equal(t, pc1, pc11) + + pc2 := &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "2", + Name: "name2", + }, + } + pc3 := &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "3", + Name: "name3", + }, + } + assert.Nil(t, c.InsertPluginConfig(pc2), "inserting plugin_config pc2") + assert.Nil(t, c.InsertPluginConfig(pc3), "inserting plugin_config pc3") + + pc22, err := c.GetPluginConfig("2") + assert.Nil(t, err) + assert.Equal(t, pc2, pc22) + + assert.Nil(t, c.DeletePluginConfig(pc3), "delete plugin_config pc3") + + pcList, err := c.ListPluginConfigs() + assert.Nil(t, err, "listing plugin_config") + + if pcList[0].Name > pcList[1].Name { + pcList[0], pcList[1] = pcList[1], pcList[0] + } + assert.Equal(t, pcList[0], pc1) + assert.Equal(t, pcList[1], pc2) + + pc4 := &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "4", + Name: "name4", + }, + } + assert.Error(t, ErrNotFound, c.DeletePluginConfig(pc4)) +} diff --git a/pkg/apisix/cache/schema.go b/pkg/apisix/cache/schema.go index 6b9e8ba21c7..66c358893fb 100644 --- a/pkg/apisix/cache/schema.go +++ b/pkg/apisix/cache/schema.go @@ -116,6 +116,22 @@ var ( }, }, }, + "plugin_config": { + Name: "plugin_config", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "name": { + Name: "name", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Name"}, + AllowMissing: true, + }, + }, + }, }, } ) diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go index 42a7f7ae2dd..194a9068b56 100644 --- a/pkg/apisix/cluster.go +++ b/pkg/apisix/cluster.go @@ -99,6 +99,7 @@ type cluster struct { consumer Consumer plugin Plugin schema Schema + pluginConfig PluginConfig metricsCollector metrics.Collector } @@ -140,6 +141,7 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) { c.consumer = newConsumerClient(c) c.plugin = newPluginClient(c) c.schema = newSchemaClient(c) + c.pluginConfig = newPluginConfigClient(c) c.cache, err = cache.NewMemDBCache() if err != nil { diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go index 64ce7bb2611..445ff62cb48 100644 --- a/pkg/apisix/nonexistentclient.go +++ b/pkg/apisix/nonexistentclient.go @@ -29,27 +29,29 @@ type nonExistentCluster struct { func newNonExistentCluster() *nonExistentCluster { return &nonExistentCluster{ embedDummyResourceImplementer{ - route: &dummyRoute{}, - ssl: &dummySSL{}, - upstream: &dummyUpstream{}, - streamRoute: &dummyStreamRoute{}, - globalRule: &dummyGlobalRule{}, - consumer: &dummyConsumer{}, - plugin: &dummyPlugin{}, - schema: &dummySchema{}, + route: &dummyRoute{}, + ssl: &dummySSL{}, + upstream: &dummyUpstream{}, + streamRoute: &dummyStreamRoute{}, + globalRule: &dummyGlobalRule{}, + consumer: &dummyConsumer{}, + plugin: &dummyPlugin{}, + schema: &dummySchema{}, + pluginConfig: &dummyPluginConfig{}, }, } } type embedDummyResourceImplementer struct { - route Route - ssl SSL - upstream Upstream - streamRoute StreamRoute - globalRule GlobalRule - consumer Consumer - plugin Plugin - schema Schema + route Route + ssl SSL + upstream Upstream + streamRoute StreamRoute + globalRule GlobalRule + consumer Consumer + plugin Plugin + schema Schema + pluginConfig PluginConfig } type dummyRoute struct{} @@ -212,6 +214,32 @@ func (f *dummySchema) GetSslSchema(_ context.Context) (*v1.Schema, error) { return nil, ErrClusterNotExist } +func (f *dummySchema) GetPluginConfigSchema(_ context.Context) (*v1.Schema, error) { + return nil, ErrClusterNotExist +} + +type dummyPluginConfig struct{} + +func (f *dummyPluginConfig) Get(_ context.Context, _ string) (*v1.PluginConfig, error) { + return nil, ErrClusterNotExist +} + +func (f *dummyPluginConfig) List(_ context.Context) ([]*v1.PluginConfig, error) { + return nil, ErrClusterNotExist +} + +func (f *dummyPluginConfig) Create(_ context.Context, _ *v1.PluginConfig) (*v1.PluginConfig, error) { + return nil, ErrClusterNotExist +} + +func (f *dummyPluginConfig) Delete(_ context.Context, _ *v1.PluginConfig) error { + return ErrClusterNotExist +} + +func (f *dummyPluginConfig) Update(_ context.Context, _ *v1.PluginConfig) (*v1.PluginConfig, error) { + return nil, ErrClusterNotExist +} + func (nc *nonExistentCluster) Route() Route { return nc.route } @@ -267,6 +295,7 @@ func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error { return func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error { return nil } func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error { return nil } func (c *dummyCache) InsertSchema(_ *v1.Schema) error { return nil } +func (c *dummyCache) InsertPluginConfig(_ *v1.PluginConfig) error { return nil } func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound } @@ -274,17 +303,22 @@ func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error) { return nil, cache.ErrNotFound } func (c *dummyCache) GetSchema(_ string) (*v1.Schema, error) { return nil, cache.ErrNotFound } -func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil } -func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil } -func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil } -func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error) { return nil, nil } -func (c *dummyCache) ListGlobalRules() ([]*v1.GlobalRule, error) { return nil, nil } -func (c *dummyCache) ListConsumers() ([]*v1.Consumer, error) { return nil, nil } -func (c *dummyCache) ListSchema() ([]*v1.Schema, error) { return nil, nil } -func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil } -func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil } -func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil } -func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil } -func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error { return nil } -func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error { return nil } -func (c *dummyCache) DeleteSchema(_ *v1.Schema) error { return nil } +func (c *dummyCache) GetPluginConfig(_ string) (*v1.PluginConfig, error) { + return nil, cache.ErrNotFound +} +func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil } +func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil } +func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil } +func (c *dummyCache) ListStreamRoutes() ([]*v1.StreamRoute, error) { return nil, nil } +func (c *dummyCache) ListGlobalRules() ([]*v1.GlobalRule, error) { return nil, nil } +func (c *dummyCache) ListConsumers() ([]*v1.Consumer, error) { return nil, nil } +func (c *dummyCache) ListSchema() ([]*v1.Schema, error) { return nil, nil } +func (c *dummyCache) ListPluginConfigs() ([]*v1.PluginConfig, error) { return nil, nil } +func (c *dummyCache) DeleteRoute(_ *v1.Route) error { return nil } +func (c *dummyCache) DeleteSSL(_ *v1.Ssl) error { return nil } +func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return nil } +func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil } +func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error { return nil } +func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error { return nil } +func (c *dummyCache) DeleteSchema(_ *v1.Schema) error { return nil } +func (c *dummyCache) DeletePluginConfig(_ *v1.PluginConfig) error { return nil } diff --git a/pkg/apisix/pluginconfig.go b/pkg/apisix/pluginconfig.go new file mode 100644 index 00000000000..52406cecd7d --- /dev/null +++ b/pkg/apisix/pluginconfig.go @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apisix + +import ( + "bytes" + "context" + "encoding/json" + + "go.uber.org/zap" + + "github.com/apache/apisix-ingress-controller/pkg/apisix/cache" + "github.com/apache/apisix-ingress-controller/pkg/id" + "github.com/apache/apisix-ingress-controller/pkg/log" + v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" +) + +type pluginConfigClient struct { + url string + cluster *cluster +} + +func newPluginConfigClient(c *cluster) PluginConfig { + return &pluginConfigClient{ + url: c.baseURL + "/plugin_configs", + cluster: c, + } +} + +// Get returns the v1.PluginConfig. +// FIXME, currently if caller pass a non-existent resource, the Get always passes +// through cache. +func (pc *pluginConfigClient) Get(ctx context.Context, name string) (*v1.PluginConfig, error) { + log.Debugw("try to look up pluginConfig", + zap.String("name", name), + zap.String("url", pc.url), + zap.String("cluster", "default"), + ) + rid := id.GenID(name) + pluginConfig, err := pc.cluster.cache.GetPluginConfig(rid) + if err == nil { + return pluginConfig, nil + } + if err != cache.ErrNotFound { + log.Errorw("failed to find pluginConfig in cache, will try to lookup from APISIX", + zap.String("name", name), + zap.Error(err), + ) + } else { + log.Debugw("pluginConfig not found in cache, will try to lookup from APISIX", + zap.String("name", name), + zap.Error(err), + ) + } + + // TODO Add mutex here to avoid dog-pile effect. + url := pc.url + "/" + rid + resp, err := pc.cluster.getResource(ctx, url, "pluginConfig") + pc.cluster.metricsCollector.IncrAPISIXRequest("pluginConfig") + if err != nil { + if err == cache.ErrNotFound { + log.Warnw("pluginConfig not found", + zap.String("name", name), + zap.String("url", url), + zap.String("cluster", "default"), + ) + } else { + log.Errorw("failed to get pluginConfig from APISIX", + zap.String("name", name), + zap.String("url", url), + zap.String("cluster", "default"), + zap.Error(err), + ) + } + return nil, err + } + + pluginConfig, err = resp.Item.pluginConfig() + if err != nil { + log.Errorw("failed to convert pluginConfig item", + zap.String("url", pc.url), + zap.String("pluginConfig_key", resp.Item.Key), + zap.String("pluginConfig_value", string(resp.Item.Value)), + zap.Error(err), + ) + return nil, err + } + + if err := pc.cluster.cache.InsertPluginConfig(pluginConfig); err != nil { + log.Errorf("failed to reflect pluginConfig create to cache: %s", err) + return nil, err + } + return pluginConfig, nil +} + +// List is only used in cache warming up. So here just pass through +// to APISIX. +func (pc *pluginConfigClient) List(ctx context.Context) ([]*v1.PluginConfig, error) { + log.Debugw("try to list pluginConfig in APISIX", + zap.String("cluster", "default"), + zap.String("url", pc.url), + ) + pluginConfigItems, err := pc.cluster.listResource(ctx, pc.url, "pluginConfig") + pc.cluster.metricsCollector.IncrAPISIXRequest("pluginConfig") + if err != nil { + log.Errorf("failed to list pluginConfig: %s", err) + return nil, err + } + + var items []*v1.PluginConfig + for i, item := range pluginConfigItems.Node.Items { + pluginConfig, err := item.pluginConfig() + if err != nil { + log.Errorw("failed to convert pluginConfig item", + zap.String("url", pc.url), + zap.String("pluginConfig_key", item.Key), + zap.String("pluginConfig_value", string(item.Value)), + zap.Error(err), + ) + return nil, err + } + + items = append(items, pluginConfig) + log.Debugf("list pluginConfig #%d, body: %s", i, string(item.Value)) + } + + return items, nil +} + +func (pc *pluginConfigClient) Create(ctx context.Context, obj *v1.PluginConfig) (*v1.PluginConfig, error) { + log.Debugw("try to create pluginConfig", + zap.String("name", obj.Name), + zap.Any("plugins", obj.Plugins), + zap.String("cluster", "default"), + zap.String("url", pc.url), + ) + + if err := pc.cluster.HasSynced(ctx); err != nil { + return nil, err + } + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + + url := pc.url + "/" + obj.ID + log.Debugw("creating pluginConfig", zap.ByteString("body", data), zap.String("url", url)) + resp, err := pc.cluster.createResource(ctx, url, "pluginConfig", bytes.NewReader(data)) + pc.cluster.metricsCollector.IncrAPISIXRequest("pluginConfig") + if err != nil { + log.Errorf("failed to create pluginConfig: %s", err) + return nil, err + } + + pluginConfig, err := resp.Item.pluginConfig() + if err != nil { + return nil, err + } + if err := pc.cluster.cache.InsertPluginConfig(pluginConfig); err != nil { + log.Errorf("failed to reflect pluginConfig create to cache: %s", err) + return nil, err + } + return pluginConfig, nil +} + +func (pc *pluginConfigClient) Delete(ctx context.Context, obj *v1.PluginConfig) error { + log.Debugw("try to delete pluginConfig", + zap.String("id", obj.ID), + zap.String("name", obj.Name), + zap.String("cluster", "default"), + zap.String("url", pc.url), + ) + if err := pc.cluster.HasSynced(ctx); err != nil { + return err + } + url := pc.url + "/" + obj.ID + if err := pc.cluster.deleteResource(ctx, url, "pluginConfig"); err != nil { + pc.cluster.metricsCollector.IncrAPISIXRequest("pluginConfig") + return err + } + pc.cluster.metricsCollector.IncrAPISIXRequest("pluginConfig") + if err := pc.cluster.cache.DeletePluginConfig(obj); err != nil { + log.Errorf("failed to reflect pluginConfig delete to cache: %s", err) + if err != cache.ErrNotFound { + return err + } + } + return nil +} + +func (pc *pluginConfigClient) Update(ctx context.Context, obj *v1.PluginConfig) (*v1.PluginConfig, error) { + log.Debugw("try to update pluginConfig", + zap.String("id", obj.ID), + zap.String("name", obj.Name), + zap.Any("plugins", obj.Plugins), + zap.String("cluster", "default"), + zap.String("url", pc.url), + ) + if err := pc.cluster.HasSynced(ctx); err != nil { + return nil, err + } + body, err := json.Marshal(obj) + if err != nil { + return nil, err + } + url := pc.url + "/" + obj.ID + log.Debugw("updating pluginConfig", zap.ByteString("body", body), zap.String("url", url)) + resp, err := pc.cluster.updateResource(ctx, url, "pluginConfig", bytes.NewReader(body)) + pc.cluster.metricsCollector.IncrAPISIXRequest("pluginConfig") + if err != nil { + return nil, err + } + pluginConfig, err := resp.Item.pluginConfig() + if err != nil { + return nil, err + } + if err := pc.cluster.cache.InsertPluginConfig(pluginConfig); err != nil { + log.Errorf("failed to reflect pluginConfig update to cache: %s", err) + return nil, err + } + return pluginConfig, nil +} diff --git a/pkg/apisix/pluginconfig_test.go b/pkg/apisix/pluginconfig_test.go new file mode 100644 index 00000000000..39d6f117899 --- /dev/null +++ b/pkg/apisix/pluginconfig_test.go @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package apisix + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/nettest" + + "github.com/apache/apisix-ingress-controller/pkg/metrics" + v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1" +) + +type fakeAPISIXPluginConfigSrv struct { + pluginConfig map[string]json.RawMessage +} + +func (srv *fakeAPISIXPluginConfigSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + if !strings.HasPrefix(r.URL.Path, "/apisix/admin/plugin_configs") { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Method == http.MethodGet { + resp := fakeListResp{ + Count: strconv.Itoa(len(srv.pluginConfig)), + Node: fakeNode{ + Key: "/apisix/plugin_configs", + }, + } + var keys []string + for key := range srv.pluginConfig { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + resp.Node.Items = append(resp.Node.Items, fakeItem{ + Key: key, + Value: srv.pluginConfig[key], + }) + } + w.WriteHeader(http.StatusOK) + data, _ := json.Marshal(resp) + _, _ = w.Write(data) + return + } + + if r.Method == http.MethodDelete { + id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/plugin_configs/") + id = "/apisix/plugin_configs/" + id + code := http.StatusNotFound + if _, ok := srv.pluginConfig[id]; ok { + delete(srv.pluginConfig, id) + code = http.StatusOK + } + w.WriteHeader(code) + } + + if r.Method == http.MethodPut { + paths := strings.Split(r.URL.Path, "/") + key := fmt.Sprintf("/apisix/plugin_configs/%s", paths[len(paths)-1]) + data, _ := ioutil.ReadAll(r.Body) + srv.pluginConfig[key] = data + w.WriteHeader(http.StatusCreated) + resp := fakeCreateResp{ + Action: "create", + Node: fakeItem{ + Key: key, + Value: json.RawMessage(data), + }, + } + data, _ = json.Marshal(resp) + _, _ = w.Write(data) + return + } + + if r.Method == http.MethodPatch { + id := strings.TrimPrefix(r.URL.Path, "/apisix/admin/plugin_configs/") + id = "/apisix/plugin_configs/" + id + if _, ok := srv.pluginConfig[id]; !ok { + w.WriteHeader(http.StatusNotFound) + return + } + + data, _ := ioutil.ReadAll(r.Body) + srv.pluginConfig[id] = data + + w.WriteHeader(http.StatusOK) + output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data)) + _, _ = w.Write([]byte(output)) + return + } +} + +func runFakePluginConfigSrv(t *testing.T) *http.Server { + srv := &fakeAPISIXPluginConfigSrv{ + pluginConfig: make(map[string]json.RawMessage), + } + + ln, _ := nettest.NewLocalListener("tcp") + + httpSrv := &http.Server{ + Addr: ln.Addr().String(), + Handler: srv, + } + + go func() { + if err := httpSrv.Serve(ln); err != nil && err != http.ErrServerClosed { + t.Errorf("failed to run http server: %s", err) + } + }() + + return httpSrv +} + +func TestPluginConfigClient(t *testing.T) { + srv := runFakePluginConfigSrv(t) + defer func() { + assert.Nil(t, srv.Shutdown(context.Background())) + }() + + u := url.URL{ + Scheme: "http", + Host: srv.Addr, + Path: "/apisix/admin", + } + + closedCh := make(chan struct{}) + close(closedCh) + cli := newPluginConfigClient(&cluster{ + baseURL: u.String(), + cli: http.DefaultClient, + cache: &dummyCache{}, + cacheSynced: closedCh, + metricsCollector: metrics.NewPrometheusCollector(), + }) + + // Create + obj, err := cli.Create(context.Background(), &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "1", + Name: "test", + }, + Plugins: map[string]interface{}{ + "abc": "123", + }, + }) + assert.Nil(t, err) + assert.Equal(t, obj.ID, "1") + + obj, err = cli.Create(context.Background(), &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "2", + Name: "test", + }, + Plugins: map[string]interface{}{ + "abc2": "123", + }, + }) + assert.Nil(t, err) + assert.Equal(t, obj.ID, "2") + + // List + objs, err := cli.List(context.Background()) + assert.Nil(t, err) + assert.Len(t, objs, 2) + assert.Equal(t, objs[0].ID, "1") + assert.Equal(t, objs[1].ID, "2") + + // Delete then List + assert.Nil(t, cli.Delete(context.Background(), objs[0])) + objs, err = cli.List(context.Background()) + assert.Nil(t, err) + assert.Len(t, objs, 1) + assert.Equal(t, "2", objs[0].ID) + + // Patch then List + up := &v1.PluginConfig{ + Metadata: v1.Metadata{ + ID: "2", + Name: "test", + }, + Plugins: map[string]interface{}{ + "abc2": "456", + "key2": "test update PluginConfig", + }, + } + _, err = cli.Update(context.Background(), up) + assert.Nil(t, err) + objs, err = cli.List(context.Background()) + assert.Nil(t, err) + assert.Len(t, objs, 1) + assert.Equal(t, "2", objs[0].ID) + assert.Equal(t, up.Plugins, objs[0].Plugins) +} diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go index 3792de6093e..c4e67cb8aeb 100644 --- a/pkg/apisix/resource.go +++ b/pkg/apisix/resource.go @@ -173,3 +173,13 @@ func (i *item) consumer() (*v1.Consumer, error) { } return &consumer, nil } + +// pluginConfig decodes item.Value and converts it to v1.PluginConfig. +func (i *item) pluginConfig() (*v1.PluginConfig, error) { + log.Debugf("got pluginConfig: %s", string(i.Value)) + var pluginConfig v1.PluginConfig + if err := json.Unmarshal(i.Value, &pluginConfig); err != nil { + return nil, err + } + return &pluginConfig, nil +} diff --git a/pkg/apisix/schema.go b/pkg/apisix/schema.go index 1a292f79e37..82597f0fa0f 100644 --- a/pkg/apisix/schema.go +++ b/pkg/apisix/schema.go @@ -110,3 +110,8 @@ func (sc schemaClient) GetConsumerSchema(ctx context.Context) (*v1.Schema, error func (sc schemaClient) GetSslSchema(ctx context.Context) (*v1.Schema, error) { return sc.getSchema(ctx, "ssl") } + +// GetPluginConfigSchema returns PluginConfig's schema. +func (sc schemaClient) GetPluginConfigSchema(ctx context.Context) (*v1.Schema, error) { + return sc.getSchema(ctx, "pluginConfig") +} diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go index d9e5566a9a5..99c99a9acc3 100644 --- a/pkg/types/apisix/v1/types.go +++ b/pkg/types/apisix/v1/types.go @@ -353,6 +353,13 @@ type Consumer struct { Plugins Plugins `json:"plugins,omitempty" yaml:"plugins,omitempty"` } +// PluginConfig apisix plugin object +// +k8s:deepcopy-gen=true +type PluginConfig struct { + Metadata `json:",inline" yaml:",inline"` + Plugins Plugins `json:"plugins,omitempty" yaml:"plugins,omitempty"` +} + // NewDefaultUpstream returns an empty Upstream with default values. func NewDefaultUpstream() *Upstream { return &Upstream{ diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go b/pkg/types/apisix/v1/zz_generated.deepcopy.go index a30ef8ac0b8..e978ae9a027 100644 --- a/pkg/types/apisix/v1/zz_generated.deepcopy.go +++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go @@ -189,6 +189,24 @@ func (in *MutualTLSClientConfig) DeepCopy() *MutualTLSClientConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PluginConfig) DeepCopyInto(out *PluginConfig) { + *out = *in + in.Metadata.DeepCopyInto(&out.Metadata) + in.Plugins.DeepCopyInto(&out.Plugins) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PluginConfig. +func (in *PluginConfig) DeepCopy() *PluginConfig { + if in == nil { + return nil + } + out := new(PluginConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RedirectConfig) DeepCopyInto(out *RedirectConfig) { *out = *in