Skip to content

Commit

Permalink
refactor rate limiting (#545)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianxiaoliang committed Mar 5, 2019
1 parent ac9e3fc commit f7b8308
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 97 deletions.
13 changes: 11 additions & 2 deletions control/archaius/panel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package archaius
import (
"github.com/go-chassis/go-archaius"
"github.com/go-chassis/go-chassis/control"
"github.com/go-chassis/go-chassis/core/common"
"github.com/go-chassis/go-chassis/core/config"
"github.com/go-chassis/go-chassis/core/config/model"
"github.com/go-chassis/go-chassis/core/invocation"
Expand Down Expand Up @@ -53,8 +54,16 @@ func (p *Panel) GetLoadBalancing(inv invocation.Invocation) control.LoadBalancin
func (p *Panel) GetRateLimiting(inv invocation.Invocation, serviceType string) control.RateLimitingConfig {
rl := control.RateLimitingConfig{}
rl.Enabled = archaius.GetBool("cse.flowcontrol."+serviceType+".qps.enabled", true)
operationMeta := qpslimiter.InitSchemaOperations(&inv)
rl.Rate, rl.Key = qpslimiter.GetQPSTrafficLimiter().GetQPSRateWithPriority(operationMeta)
if serviceType == common.Consumer {
keys := qpslimiter.GetConsumerKey(inv.SourceMicroService, inv.MicroServiceName, inv.SchemaID, inv.OperationID)
rl.Rate, rl.Key = qpslimiter.GetQPSTrafficLimiter().GetQPSRateWithPriority(
keys.OperationQualifiedName, keys.SchemaQualifiedName, keys.MicroServiceName)
} else {
keys := qpslimiter.GetProviderKey(inv.SourceMicroService)
rl.Rate, rl.Key = qpslimiter.GetQPSTrafficLimiter().GetQPSRateWithPriority(
keys.ServiceOriented, keys.Global)
}

return rl
}

Expand Down
37 changes: 8 additions & 29 deletions core/handler/qps_provider_flow_control_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package handler

import (
"fmt"
"github.com/go-chassis/go-archaius"
"github.com/go-chassis/go-chassis/control"
"github.com/go-chassis/go-chassis/core/common"
"github.com/go-chassis/go-chassis/core/invocation"
"github.com/go-chassis/go-chassis/core/qpslimiter"
"net/http"
Expand All @@ -11,38 +12,16 @@ import (
// ProviderRateLimiterHandler provider rate limiter handler
type ProviderRateLimiterHandler struct{}

// constant for provider qps limiter keys
const (
ProviderQPSLimit = "cse.flowcontrol.Provider.qps.limit"
ProviderLimitKeyGlobal = "cse.flowcontrol.Provider.qps.global.limit"
)

// Handle is to handle provider rateLimiter things
func (rl *ProviderRateLimiterHandler) Handle(chain *Chain, i *invocation.Invocation, cb invocation.ResponseCallBack) {
if !archaius.GetBool("cse.flowcontrol.Provider.qps.enabled", true) {
rlc := control.DefaultPanel.GetRateLimiting(*i, common.Provider)
if !rlc.Enabled {
chain.Next(i, cb)
return
}

//provider has limiter only on microservice name.
key := ProviderLimitKeyGlobal
rate := qpslimiter.DefaultRate
ok := false
if i.SourceMicroService != "" {
//use chassis Invoker will send SourceMicroService through network
key = ProviderQPSLimit + "." + i.SourceMicroService
if rate, ok = qpslimiter.GetQPSRate(key); !ok {
key = ProviderLimitKeyGlobal
rate, _ = qpslimiter.GetQPSRate(ProviderLimitKeyGlobal)
}

} else {
key = ProviderLimitKeyGlobal
rate, _ = qpslimiter.GetQPSRate(key)
return
}

//qps rate <=0
if rate <= 0 {
if rlc.Rate <= 0 {
switch i.Reply.(type) {
case *http.Response:
resp := i.Reply.(*http.Response)
Expand All @@ -51,11 +30,11 @@ func (rl *ProviderRateLimiterHandler) Handle(chain *Chain, i *invocation.Invocat

r := &invocation.Response{}
r.Status = http.StatusTooManyRequests
r.Err = fmt.Errorf("%s | %v", key, rate)
r.Err = fmt.Errorf("%s | %v", rlc.Key, rlc.Rate)
cb(r)
return
}
qpslimiter.GetQPSTrafficLimiter().ProcessQPSTokenReq(key, rate)
qpslimiter.GetQPSTrafficLimiter().ProcessQPSTokenReq(rlc.Key, rlc.Rate)
//call next chain
chain.Next(i, cb)

Expand Down
62 changes: 33 additions & 29 deletions core/qpslimiter/consumer_qps_operation_meta.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,68 @@
package qpslimiter

import (
"github.com/go-chassis/go-chassis/core/common"
"strings"

"github.com/go-chassis/go-chassis/core/invocation"
)

// OperationMeta operation meta struct
type OperationMeta struct {
// ConsumerKeys contain consumer keys
type ConsumerKeys struct {
MicroServiceName string
SchemaQualifiedName string
OperationQualifiedName string
}

// GetSpecificKey get specific key
func GetSpecificKey(sourceName, serviceType, serviceName, schemaID, OperationID string) string {
var cmd = "cse.flowcontrol"
// ProviderKeys contain provider keys
type ProviderKeys struct {
Global string
ServiceOriented string
}

//Prefix is const
const Prefix = "cse.flowcontrol"

// GetConsumerKey get specific key for consumer
func GetConsumerKey(sourceName, serviceName, schemaID, OperationID string) *ConsumerKeys {
keys := new(ConsumerKeys)
//for mesher to govern
if sourceName != "" {
cmd = strings.Join([]string{cmd, sourceName, serviceType, "qps.limit"}, ".")
keys.MicroServiceName = strings.Join([]string{Prefix, sourceName, common.Consumer, "qps.limit", serviceName}, ".")
} else {
cmd = strings.Join([]string{cmd, serviceType, "qps.limit"}, ".")
}
if serviceName != "" {
cmd = strings.Join([]string{cmd, serviceName}, ".")
if serviceName != "" {
keys.MicroServiceName = strings.Join([]string{Prefix, common.Consumer, "qps.limit", serviceName}, ".")
}
}
if schemaID != "" {
cmd = strings.Join([]string{cmd, schemaID}, ".")
keys.SchemaQualifiedName = strings.Join([]string{keys.MicroServiceName, schemaID}, ".")
}
if OperationID != "" {
cmd = strings.Join([]string{cmd, OperationID}, ".")
keys.OperationQualifiedName = strings.Join([]string{keys.SchemaQualifiedName, OperationID}, ".")
}
return cmd

return keys
}

// InitSchemaOperations initialize schema operations
func InitSchemaOperations(i *invocation.Invocation) *OperationMeta {
opMeta := new(OperationMeta)

opMeta.MicroServiceName = GetSpecificKey(i.SourceMicroService, "Consumer", i.MicroServiceName, "", "")
opMeta.SchemaQualifiedName = GetSpecificKey(i.SourceMicroService, "Consumer", i.MicroServiceName, i.SchemaID, "")
opMeta.OperationQualifiedName = GetSpecificKey(i.SourceMicroService, "Consumer", i.MicroServiceName, i.SchemaID, i.OperationID)
// GetProviderKey get specific key for provider
func GetProviderKey(sourceServiceName string) *ProviderKeys {
keys := &ProviderKeys{}
if sourceServiceName != "" {
keys.ServiceOriented = strings.Join([]string{Prefix, common.Provider, "qps.limit", sourceServiceName}, ".")
}

//for mesher server side rate limit
//as a proxy,mesher handler request from instances that belong to different ms
return opMeta
keys.Global = strings.Join([]string{Prefix, common.Provider, "qps.global.limit"}, ".")
return keys
}

// GetSchemaQualifiedName get schema qualified name
func (op *OperationMeta) GetSchemaQualifiedName() string {
func (op *ConsumerKeys) GetSchemaQualifiedName() string {
return op.SchemaQualifiedName
}

// GetMicroServiceSchemaOpQualifiedName get micro-service schema operation qualified name
func (op *OperationMeta) GetMicroServiceSchemaOpQualifiedName() string {
func (op *ConsumerKeys) GetMicroServiceSchemaOpQualifiedName() string {
return op.OperationQualifiedName
}

// GetMicroServiceName get micro-service name
func (op *OperationMeta) GetMicroServiceName() string {
func (op *ConsumerKeys) GetMicroServiceName() string {
return op.MicroServiceName
}
20 changes: 10 additions & 10 deletions core/qpslimiter/consumer_qps_operation_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ import (
"testing"
)

func TestInitSchemaOperations(t *testing.T) {
func TestGetConsumerKey(t *testing.T) {
i := &invocation.Invocation{
MicroServiceName: "service1",
SchemaID: "schema1",
OperationID: "SayHello",
Args: &helloworld.HelloRequest{Name: "peter"},
}

opMeta := qpslimiter.InitSchemaOperations(i)
t.Log("initializing schemaoperation from invocation object, OperationMeta = ", *opMeta)
sName := opMeta.GetMicroServiceName()
opMeta := qpslimiter.GetConsumerKey(i.SourceMicroService, i.MicroServiceName, i.SchemaID, i.OperationID)
t.Log("initializing schemaoperation from invocation object, ConsumerKeys = ", *opMeta)
sName := opMeta.MicroServiceName
assert.Equal(t, "cse.flowcontrol.Consumer.qps.limit.service1", sName)

schemaOpeartionName := opMeta.GetMicroServiceSchemaOpQualifiedName()
assert.Equal(t, "cse.flowcontrol.Consumer.qps.limit.service1.schema1.SayHello", schemaOpeartionName)
schemaOperationName := opMeta.OperationQualifiedName
assert.Equal(t, "cse.flowcontrol.Consumer.qps.limit.service1.schema1.SayHello", schemaOperationName)

schemaName := opMeta.GetSchemaQualifiedName()
schemaName := opMeta.SchemaQualifiedName
assert.Equal(t, "cse.flowcontrol.Consumer.qps.limit.service1.schema1", schemaName)

}
func TestInitSchemaOperations4Mesher(t *testing.T) {
func TestGetConsumerKey2(t *testing.T) {
i := &invocation.Invocation{
SourceMicroService: "client:1.1:sock",
MicroServiceName: "service1",
Expand All @@ -38,8 +38,8 @@ func TestInitSchemaOperations4Mesher(t *testing.T) {
Args: &helloworld.HelloRequest{Name: "peter"},
}

opMeta := qpslimiter.InitSchemaOperations(i)
t.Log("initializing schemaoperation from invocation object with sourceMicroserviceName, OperationMeta = ", *opMeta)
opMeta := qpslimiter.GetConsumerKey(i.SourceMicroService, i.MicroServiceName, i.SchemaID, i.OperationID)
t.Log("initializing schemaoperation from invocation object with sourceMicroserviceName, ConsumerKeys = ", *opMeta)
sName := opMeta.GetMicroServiceName()
assert.Equal(t, "cse.flowcontrol.client:1.1:sock.Consumer.qps.limit.service1", sName)

Expand Down
25 changes: 7 additions & 18 deletions core/qpslimiter/qps_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,30 +90,19 @@ func GetQPSRate(rateConfig string) (int, bool) {
}

// GetQPSRateWithPriority get qps rate with priority
func (qpsL *QPSLimiterMap) GetQPSRateWithPriority(op *OperationMeta) (int, string) {
func (qpsL *QPSLimiterMap) GetQPSRateWithPriority(cmd ...string) (int, string) {
var (
key string
qpsVal int
configExist bool
)
key = op.GetMicroServiceSchemaOpQualifiedName()
qpsVal, configExist = GetQPSRate(key)
if configExist {
return qpsVal, op.GetMicroServiceSchemaOpQualifiedName()
}

key = op.GetSchemaQualifiedName()
qpsVal, configExist = GetQPSRate(key)
if configExist {
return qpsVal, op.GetSchemaQualifiedName()
for _, c := range cmd {
qpsVal, configExist = GetQPSRate(c)
if configExist {
return qpsVal, c
}
}

key = op.GetMicroServiceName()
qpsVal, configExist = GetQPSRate(key)
if configExist {
return qpsVal, op.GetMicroServiceName()
}
return DefaultRate, op.GetMicroServiceName()
return DefaultRate, cmd[len(cmd)-1]

}

Expand Down
15 changes: 11 additions & 4 deletions core/qpslimiter/qps_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ func TestGetQpsRateWithPriority(t *testing.T) {
Args: &helloworld.HelloRequest{Name: "peter"},
}

opMeta := qpslimiter.InitSchemaOperations(i)
opMeta := qpslimiter.GetConsumerKey(i.SourceMicroService, i.MicroServiceName, i.SchemaID, i.OperationID)

qps := qpslimiter.GetQPSTrafficLimiter()
rate, key := qps.GetQPSRateWithPriority(opMeta)
log.Println("rate is :", rate)
assert.Equal(t, key, "cse.flowcontrol.Consumer.qps.limit.service1")
rate, key := qps.GetQPSRateWithPriority(opMeta.OperationQualifiedName, opMeta.SchemaQualifiedName, opMeta.MicroServiceName)
t.Log("rate is :", rate)
assert.Equal(t, "cse.flowcontrol.Consumer.qps.limit.service1", key)

i = &invocation.Invocation{
MicroServiceName: "service1",
}
keys := qpslimiter.GetProviderKey(i.SourceMicroService)
rate, key = qps.GetQPSRateWithPriority(keys.ServiceOriented, keys.Global)
assert.Equal(t, "cse.flowcontrol.Provider.qps.global.limit", key)
}

func TestUpdateRateLimit(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions docs/user-guides/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ cse:
```yaml
cse:
flowcontrol
flowcontrol:
Provider:
qps:
enabled: true # enable rate limiting or not
global:
limit: 100 # default limit of provider
limit:
Server: 100 # rate limit for request from a provider
Client: 100 # rate limit for request from a consumer
```
#### Consumer示例
Expand Down
14 changes: 11 additions & 3 deletions examples/discovery/server/conf/chassis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ region:
name: us-east
availableZone: us-east-1
cse:
flowcontrol:
Provider:
qps:
enabled: true # enable rate limiting or not
global:
limit: 100 # default limit of provider
limit:
benchmark: 200 # rate limit for request from a consumer
loadbalance:
strategy:
name: RoundRobin
Expand Down Expand Up @@ -45,9 +53,9 @@ cse:
listenAddress: 127.0.0.1:8083
advertiseAddress: 127.0.0.1:8083
handler:
chain:
Provider:
default: tracing-provider
# chain:
# Provider:
# default: tracing-provider
# ssl:
# registry.Consumer.cipherPlugin: default
# registry.Consumer.verifyPeer: false
Expand Down

0 comments on commit f7b8308

Please sign in to comment.