From a66b06fee323ce41dd5f6e072d62bcd375547d27 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 15 Jun 2022 17:22:34 +0800 Subject: [PATCH] api: add Rate-limit config update API (#4843) ref tikv/pd#4666, ref tikv/pd#4839 add Rate-limit config update API Signed-off-by: Cabinfever_B Co-authored-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/jsonutil/jsonutil.go | 49 ++++++++ pkg/jsonutil/jsonutil_test.go | 65 +++++++++++ server/api/config.go | 43 +------ server/api/router.go | 1 + server/api/service_middleware.go | 123 ++++++++++++++++---- server/api/service_middleware_test.go | 160 +++++++++++++++++++++++++- server/server.go | 40 +++++++ 7 files changed, 420 insertions(+), 61 deletions(-) create mode 100644 pkg/jsonutil/jsonutil.go create mode 100644 pkg/jsonutil/jsonutil_test.go diff --git a/pkg/jsonutil/jsonutil.go b/pkg/jsonutil/jsonutil.go new file mode 100644 index 000000000000..c5ae2f378da2 --- /dev/null +++ b/pkg/jsonutil/jsonutil.go @@ -0,0 +1,49 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jsonutil + +import ( + "bytes" + "encoding/json" + + "github.com/tikv/pd/pkg/reflectutil" +) + +// AddKeyValue is used to add a key value pair into `old` +func AddKeyValue(old interface{}, key string, value interface{}) (updated bool, found bool, err error) { + data, err := json.Marshal(map[string]interface{}{key: value}) + if err != nil { + return false, false, err + } + return MergeJSONObject(old, data) +} + +// MergeJSONObject is used to merge a marshaled json object into v +func MergeJSONObject(v interface{}, data []byte) (updated bool, found bool, err error) { + old, _ := json.Marshal(v) + if err := json.Unmarshal(data, v); err != nil { + return false, false, err + } + new, _ := json.Marshal(v) + if !bytes.Equal(old, new) { + return true, true, nil + } + m := make(map[string]interface{}) + if err := json.Unmarshal(data, &m); err != nil { + return false, false, err + } + found = reflectutil.FindSameFieldByJSON(v, m) + return false, found, nil +} diff --git a/pkg/jsonutil/jsonutil_test.go b/pkg/jsonutil/jsonutil_test.go new file mode 100644 index 000000000000..a046fbaf70a4 --- /dev/null +++ b/pkg/jsonutil/jsonutil_test.go @@ -0,0 +1,65 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jsonutil + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type testJSONStructLevel1 struct { + Name string `json:"name"` + Sub1 testJSONStructLevel2 `json:"sub1"` + Sub2 testJSONStructLevel2 `json:"sub2"` +} + +type testJSONStructLevel2 struct { + SubName string `json:"sub-name"` +} + +func TestJSONUtil(t *testing.T) { + t.Parallel() + re := require.New(t) + father := &testJSONStructLevel1{ + Name: "father", + } + son1 := &testJSONStructLevel2{ + SubName: "son1", + } + update, found, err := AddKeyValue(&father, "sub1", &son1) + re.NoError(err) + re.True(update) + re.True(found) + + son2 := &testJSONStructLevel2{ + SubName: "son2", + } + + update, found, err = AddKeyValue(father, "sub2", &son2) + re.NoError(err) + re.True(update) + re.True(found) + + update, found, err = AddKeyValue(father, "sub3", &son2) + re.NoError(err) + re.False(update) + re.False(found) + + update, found, err = AddKeyValue(father, "sub2", &son2) + re.NoError(err) + re.False(update) + re.True(found) +} diff --git a/server/api/config.go b/server/api/config.go index d4d907352898..b33dd5c5a974 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -15,7 +15,6 @@ package api import ( - "bytes" "encoding/json" "fmt" "io" @@ -29,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/pkg/jsonutil" "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/pkg/reflectutil" "github.com/tikv/pd/server" @@ -166,12 +166,7 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa } func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error { - data, err := json.Marshal(map[string]interface{}{key: value}) - if err != nil { - return err - } - - updated, found, err := mergeConfig(&config.Schedule, data) + updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value) if err != nil { return err } @@ -187,12 +182,7 @@ func (h *confHandler) updateSchedule(config *config.Config, key string, value in } func (h *confHandler) updateReplication(config *config.Config, key string, value interface{}) error { - data, err := json.Marshal(map[string]interface{}{key: value}) - if err != nil { - return err - } - - updated, found, err := mergeConfig(&config.Replication, data) + updated, found, err := jsonutil.AddKeyValue(&config.Replication, key, value) if err != nil { return err } @@ -214,8 +204,7 @@ func (h *confHandler) updateReplicationModeConfig(config *config.Config, key []s if err != nil { return err } - - updated, found, err := mergeConfig(&config.ReplicationMode, data) + updated, found, err := jsonutil.MergeJSONObject(&config.ReplicationMode, data) if err != nil { return err } @@ -231,12 +220,7 @@ func (h *confHandler) updateReplicationModeConfig(config *config.Config, key []s } func (h *confHandler) updatePDServerConfig(config *config.Config, key string, value interface{}) error { - data, err := json.Marshal(map[string]interface{}{key: value}) - if err != nil { - return err - } - - updated, found, err := mergeConfig(&config.PDServerCfg, data) + updated, found, err := jsonutil.AddKeyValue(&config.PDServerCfg, key, value) if err != nil { return err } @@ -288,23 +272,6 @@ func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) m return cfg } -func mergeConfig(v interface{}, data []byte) (updated bool, found bool, err error) { - old, _ := json.Marshal(v) - if err := json.Unmarshal(data, v); err != nil { - return false, false, err - } - new, _ := json.Marshal(v) - if !bytes.Equal(old, new) { - return true, true, nil - } - m := make(map[string]interface{}) - if err := json.Unmarshal(data, &m); err != nil { - return false, false, err - } - found = reflectutil.FindSameFieldByJSON(v, m) - return false, found, nil -} - // @Tags config // @Summary Get schedule config. // @Produce json diff --git a/server/api/router.go b/server/api/router.go index e755341ebef4..3e8061fd74e2 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -285,6 +285,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { serviceMiddlewareHandler := newServiceMiddlewareHandler(svr, rd) registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.GetServiceMiddlewareConfig, setMethods("GET")) registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.SetServiceMiddlewareConfig, setMethods("POST"), setAuditBackend(localLog)) + registerFunc(apiRouter, "/service-middleware/config/rate-limit", serviceMiddlewareHandler.SetRatelimitConfig, setMethods("POST"), setAuditBackend(localLog)) logHandler := newLogHandler(svr, rd) registerFunc(apiRouter, "/admin/log", logHandler.SetLogLevel, setMethods("POST"), setAuditBackend(localLog)) diff --git a/server/api/service_middleware.go b/server/api/service_middleware.go index 0f41f8ae7252..426399a1d6e9 100644 --- a/server/api/service_middleware.go +++ b/server/api/service_middleware.go @@ -23,6 +23,9 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/pkg/jsonutil" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/reflectutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" @@ -107,18 +110,13 @@ func (h *serviceMiddlewareHandler) updateServiceMiddlewareConfig(cfg *config.Ser case "audit": return h.updateAudit(cfg, kp[len(kp)-1], value) case "rate-limit": - return h.updateRateLimit(cfg, kp[len(kp)-1], value) + return h.svr.UpdateRateLimit(&cfg.RateLimitConfig, kp[len(kp)-1], value) } return errors.Errorf("config prefix %s not found", kp[0]) } func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareConfig, key string, value interface{}) error { - data, err := json.Marshal(map[string]interface{}{key: value}) - if err != nil { - return err - } - - updated, found, err := mergeConfig(&config.AuditConfig, data) + updated, found, err := jsonutil.AddKeyValue(&config.AuditConfig, key, value) if err != nil { return err } @@ -133,23 +131,104 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC return err } -func (h *serviceMiddlewareHandler) updateRateLimit(config *config.ServiceMiddlewareConfig, key string, value interface{}) error { - data, err := json.Marshal(map[string]interface{}{key: value}) - if err != nil { - return err +// @Tags service_middleware +// @Summary update ratelimit config +// @Param body body object string "json params" +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "config item not found" +// @Router /service-middleware/config/rate-limit [POST] +func (h *serviceMiddlewareHandler) SetRatelimitConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { + return } - - updated, found, err := mergeConfig(&config.RateLimitConfig, data) - if err != nil { - return err + typeStr, ok := input["type"].(string) + if !ok { + h.rd.JSON(w, http.StatusBadRequest, "The type is empty.") + return } - - if !found { - return errors.Errorf("config item %s not found", key) + var serviceLabel string + switch typeStr { + case "label": + serviceLabel, ok = input["label"].(string) + if !ok || len(serviceLabel) == 0 { + h.rd.JSON(w, http.StatusBadRequest, "The label is empty.") + return + } + if len(h.svr.GetServiceLabels(serviceLabel)) == 0 { + h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") + return + } + case "path": + method, _ := input["method"].(string) + path, ok := input["path"].(string) + if !ok || len(path) == 0 { + h.rd.JSON(w, http.StatusBadRequest, "The path is empty.") + return + } + serviceLabel = h.svr.GetAPIAccessServiceLabel(apiutil.NewAccessPath(path, method)) + if len(serviceLabel) == 0 { + h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") + return + } + default: + h.rd.JSON(w, http.StatusBadRequest, "The type is invalid.") + return } - - if updated { - err = h.svr.SetRateLimitConfig(config.RateLimitConfig) + if h.svr.IsInRateLimitAllowList(serviceLabel) { + h.rd.JSON(w, http.StatusBadRequest, "This service is in allow list whose config can not be changed.") + return } - return err + cfg := h.svr.GetRateLimitConfig().LimiterConfig[serviceLabel] + // update concurrency limiter + concurrencyUpdatedFlag := "Concurrency limiter is not changed." + concurrencyFloat, okc := input["concurrency"].(float64) + if okc { + cfg.ConcurrencyLimit = uint64(concurrencyFloat) + } + // update qps rate limiter + qpsRateUpdatedFlag := "QPS rate limiter is not changed." + qps, okq := input["qps"].(float64) + if okq { + brust := 0 + if int(qps) > 1 { + brust = int(qps) + } else if qps > 0 { + brust = 1 + } + cfg.QPS = qps + cfg.QPSBurst = brust + } + if !okc && !okq { + h.rd.JSON(w, http.StatusOK, "No changed.") + } else { + status := h.svr.UpdateServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&cfg)) + switch { + case status&ratelimit.QPSChanged != 0: + qpsRateUpdatedFlag = "QPS rate limiter is changed." + case status&ratelimit.QPSDeleted != 0: + qpsRateUpdatedFlag = "QPS rate limiter is deleted." + } + switch { + case status&ratelimit.ConcurrencyChanged != 0: + concurrencyUpdatedFlag = "Concurrency limiter is changed." + case status&ratelimit.ConcurrencyDeleted != 0: + concurrencyUpdatedFlag = "Concurrency limiter is deleted." + } + err := h.svr.UpdateRateLimitConfig("limiter-config", serviceLabel, cfg) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + } else { + result := rateLimitResult{concurrencyUpdatedFlag, qpsRateUpdatedFlag, h.svr.GetServiceMiddlewareConfig().RateLimitConfig.LimiterConfig} + h.rd.JSON(w, http.StatusOK, result) + } + } +} + +type rateLimitResult struct { + ConcurrencyUpdatedFlag string `json:"concurrency"` + QPSRateUpdatedFlag string `json:"qps"` + LimiterConfig map[string]ratelimit.DimensionConfig `json:"limiter-config"` } diff --git a/server/api/service_middleware_test.go b/server/api/service_middleware_test.go index a1d4804650cd..6ea0343f53bf 100644 --- a/server/api/service_middleware_test.go +++ b/server/api/service_middleware_test.go @@ -57,7 +57,8 @@ func (s *testAuditMiddlewareSuite) TestConfigAuditSwitch(c *C) { c.Assert(sc.EnableAudit, Equals, false) ms := map[string]interface{}{ - "enable-audit": "true", + "enable-audit": "true", + "enable-rate-limit": "true", } postData, err := json.Marshal(ms) c.Assert(err, IsNil) @@ -65,8 +66,10 @@ func (s *testAuditMiddlewareSuite) TestConfigAuditSwitch(c *C) { sc = &config.ServiceMiddlewareConfig{} c.Assert(tu.ReadGetJSON(c, testDialClient, addr, sc), IsNil) c.Assert(sc.EnableAudit, Equals, true) + c.Assert(sc.EnableRateLimit, Equals, true) ms = map[string]interface{}{ "audit.enable-audit": "false", + "enable-rate-limit": "false", } postData, err = json.Marshal(ms) c.Assert(err, IsNil) @@ -74,6 +77,7 @@ func (s *testAuditMiddlewareSuite) TestConfigAuditSwitch(c *C) { sc = &config.ServiceMiddlewareConfig{} c.Assert(tu.ReadGetJSON(c, testDialClient, addr, sc), IsNil) c.Assert(sc.EnableAudit, Equals, false) + c.Assert(sc.EnableRateLimit, Equals, false) // test empty ms = map[string]interface{}{} @@ -124,6 +128,160 @@ func (s *testRateLimitConfigSuite) TearDownSuite(c *C) { s.cleanup() } +func (s *testRateLimitConfigSuite) TestUpdateRateLimitConfig(c *C) { + urlPrefix := fmt.Sprintf("%s%s/api/v1/service-middleware/config/rate-limit", s.svr.GetAddr(), apiPrefix) + + // test empty type + input := make(map[string]interface{}) + input["type"] = 123 + jsonBody, err := json.Marshal(input) + c.Assert(err, IsNil) + + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.Status(c, http.StatusBadRequest), tu.StringEqual(c, "\"The type is empty.\"\n")) + c.Assert(err, IsNil) + // test invalid type + input = make(map[string]interface{}) + input["type"] = "url" + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.Status(c, http.StatusBadRequest), tu.StringEqual(c, "\"The type is invalid.\"\n")) + c.Assert(err, IsNil) + + // test empty label + input = make(map[string]interface{}) + input["type"] = "label" + input["label"] = "" + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.Status(c, http.StatusBadRequest), tu.StringEqual(c, "\"The label is empty.\"\n")) + c.Assert(err, IsNil) + // test no label matched + input = make(map[string]interface{}) + input["type"] = "label" + input["label"] = "TestLabel" + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.Status(c, http.StatusBadRequest), tu.StringEqual(c, "\"There is no label matched.\"\n")) + c.Assert(err, IsNil) + + // test empty path + input = make(map[string]interface{}) + input["type"] = "path" + input["path"] = "" + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.Status(c, http.StatusBadRequest), tu.StringEqual(c, "\"The path is empty.\"\n")) + c.Assert(err, IsNil) + + // test path but no label matched + input = make(map[string]interface{}) + input["type"] = "path" + input["path"] = "/pd/api/v1/test" + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.Status(c, http.StatusBadRequest), tu.StringEqual(c, "\"There is no label matched.\"\n")) + c.Assert(err, IsNil) + + // no change + input = make(map[string]interface{}) + input["type"] = "label" + input["label"] = "GetHealthStatus" + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringEqual(c, "\"No changed.\"\n")) + c.Assert(err, IsNil) + + // change concurrency + input = make(map[string]interface{}) + input["type"] = "path" + input["path"] = "/pd/api/v1/health" + input["method"] = "GET" + input["concurrency"] = 100 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringContain(c, "Concurrency limiter is changed.")) + c.Assert(err, IsNil) + input["concurrency"] = 0 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringContain(c, "Concurrency limiter is deleted.")) + c.Assert(err, IsNil) + + // change qps + input = make(map[string]interface{}) + input["type"] = "path" + input["path"] = "/pd/api/v1/health" + input["method"] = "GET" + input["qps"] = 100 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringContain(c, "QPS rate limiter is changed.")) + c.Assert(err, IsNil) + + input = make(map[string]interface{}) + input["type"] = "path" + input["path"] = "/pd/api/v1/health" + input["method"] = "GET" + input["qps"] = 0.3 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringContain(c, "QPS rate limiter is changed.")) + c.Assert(err, IsNil) + c.Assert(s.svr.GetRateLimitConfig().LimiterConfig["GetHealthStatus"].QPSBurst, Equals, 1) + + input["qps"] = -1 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringContain(c, "QPS rate limiter is deleted.")) + c.Assert(err, IsNil) + + // change both + input = make(map[string]interface{}) + input["type"] = "path" + input["path"] = "/pd/api/v1/debug/pprof/profile" + input["qps"] = 100 + input["concurrency"] = 100 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + result := rateLimitResult{} + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusOK(c), tu.StringContain(c, "Concurrency limiter is changed."), + tu.StringContain(c, "QPS rate limiter is changed."), + tu.ExtractJSON(c, &result), + ) + c.Assert(result.LimiterConfig["Profile"].QPS, Equals, 100.) + c.Assert(result.LimiterConfig["Profile"].QPSBurst, Equals, 100) + c.Assert(result.LimiterConfig["Profile"].ConcurrencyLimit, Equals, uint64(100)) + c.Assert(err, IsNil) + + limiter := s.svr.GetServiceRateLimiter() + limiter.Update("SetRatelimitConfig", ratelimit.AddLabelAllowList()) + + // Allow list + input = make(map[string]interface{}) + input["type"] = "label" + input["label"] = "SetRatelimitConfig" + input["qps"] = 100 + input["concurrency"] = 100 + jsonBody, err = json.Marshal(input) + c.Assert(err, IsNil) + err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, + tu.StatusNotOK(c), tu.StringEqual(c, "\"This service is in allow list whose config can not be changed.\"\n")) + c.Assert(err, IsNil) +} + func (s *testRateLimitConfigSuite) TestConfigRateLimitSwitch(c *C) { addr := fmt.Sprintf("%s/service-middleware/config", s.urlPrefix) diff --git a/server/server.go b/server/server.go index c79026925524..871c2e607384 100644 --- a/server/server.go +++ b/server/server.go @@ -44,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/grpcutil" + "github.com/tikv/pd/pkg/jsonutil" "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/systimemon" @@ -255,6 +256,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha audit.NewLocalLogBackend(true), audit.NewPrometheusHistogramBackend(serviceAuditHistogram, false), } + s.serviceRateLimiter = ratelimit.NewLimiter() s.serviceAuditBackendLabels = make(map[string]*audit.BackendLabels) s.serviceRateLimiter = ratelimit.NewLimiter() s.serviceLabels = make(map[string][]apiutil.AccessPath) @@ -978,6 +980,34 @@ func (s *Server) SetAuditConfig(cfg config.AuditConfig) error { return nil } +// UpdateRateLimitConfig is used to update rate-limit config which will reserve old limiter-config +func (s *Server) UpdateRateLimitConfig(key, label string, value ratelimit.DimensionConfig) error { + cfg := s.GetServiceMiddlewareConfig() + rateLimitCfg := make(map[string]ratelimit.DimensionConfig) + for label, item := range cfg.LimiterConfig { + rateLimitCfg[label] = item + } + rateLimitCfg[label] = value + return s.UpdateRateLimit(&cfg.RateLimitConfig, key, &rateLimitCfg) +} + +// UpdateRateLimit is used to update rate-limit config which will overwrite limiter-config +func (s *Server) UpdateRateLimit(cfg *config.RateLimitConfig, key string, value interface{}) error { + updated, found, err := jsonutil.AddKeyValue(cfg, key, value) + if err != nil { + return err + } + + if !found { + return errors.Errorf("config item %s not found", key) + } + + if updated { + err = s.SetRateLimitConfig(*cfg) + } + return err +} + // GetRateLimitConfig gets the rate limit config information. func (s *Server) GetRateLimitConfig() *config.RateLimitConfig { return s.serviceMiddlewarePersistOptions.GetRateLimitConfig().Clone() @@ -1221,6 +1251,16 @@ func (s *Server) GetServiceRateLimiter() *ratelimit.Limiter { return s.serviceRateLimiter } +// IsInRateLimitAllowList returns whethis given service label is in allow lost +func (s *Server) IsInRateLimitAllowList(serviceLabel string) bool { + return s.serviceRateLimiter.IsInAllowList(serviceLabel) +} + +// UpdateServiceRateLimiter is used to update RateLimiter +func (s *Server) UpdateServiceRateLimiter(serviceLabel string, opts ...ratelimit.Option) ratelimit.UpdateStatus { + return s.serviceRateLimiter.Update(serviceLabel, opts...) +} + // GetClusterStatus gets cluster status. func (s *Server) GetClusterStatus() (*cluster.Status, error) { s.cluster.Lock()