Skip to content

Commit

Permalink
[ISSUE #2123] Feat/polaris ratelimit (#2128)
Browse files Browse the repository at this point in the history
* feat:support polaris tps limiter

* feat:support polaris tps limiter

* feat:support polaris limit ability

* feat:support polaris limit ability

* feat:support polaris limit ability

* feat:support polaris limit ability
  • Loading branch information
chuntaojun committed Nov 21, 2022
1 parent d70b6c0 commit 40ce3ba
Show file tree
Hide file tree
Showing 11 changed files with 539 additions and 15 deletions.
12 changes: 0 additions & 12 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,6 @@ const (
NacosUpdateCacheWhenEmpty = "nacos.updateCacheWhenEmpty"
)

const (
PolarisKey = "polaris"
PolarisDefaultRoleType = 3
PolarisServiceToken = "token"
PolarisServiceNameSeparator = ":"
PolarisDubboPath = "DUBBOPATH"
PolarisInstanceID = "polaris.instanceID"
PolarisDefaultNamespace = "default"
PolarisDubboGroup = "dubbo.group"
PolarisClientName = "polaris-client"
)

const (
FileKey = "file"
)
Expand Down
34 changes: 34 additions & 0 deletions common/constant/polaris_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constant

const (
PolarisKey = "polaris"
PolarisDefaultRoleType = 3
PolarisServiceToken = "token"
PolarisServiceNameSeparator = ":"
PolarisDubboPath = "DUBBOPATH"
PolarisInstanceID = "polaris.instanceID"
PolarisDefaultNamespace = "default"
PolarisDubboGroup = "dubbo.group"
PolarisClientName = "polaris-client"
)

const (
PluginPolarisTpsLimiter = "polaris-limit"
)
1 change: 1 addition & 0 deletions filter/filter_impl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/filter/graceful_shutdown"
_ "dubbo.apache.org/dubbo-go/v3/filter/hystrix"
_ "dubbo.apache.org/dubbo-go/v3/filter/metrics"
_ "dubbo.apache.org/dubbo-go/v3/filter/polaris/limit"
_ "dubbo.apache.org/dubbo-go/v3/filter/seata"
_ "dubbo.apache.org/dubbo-go/v3/filter/sentinel"
_ "dubbo.apache.org/dubbo-go/v3/filter/token"
Expand Down
30 changes: 30 additions & 0 deletions filter/polaris/limit/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package limit

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
)

func init() {
extension.SetTpsLimiter(constant.PluginPolarisTpsLimiter, func() filter.TpsLimiter {
return &polarisTpsLimiter{}
})
}
167 changes: 167 additions & 0 deletions filter/polaris/limit/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package limit

import (
"fmt"
"time"
)

import (
"github.com/dubbogo/gost/log/logger"

"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/flow/data"
"github.com/polarismesh/polaris-go/pkg/model"
v1 "github.com/polarismesh/polaris-go/pkg/model/pb/v1"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
remotingpolaris "dubbo.apache.org/dubbo-go/v3/remoting/polaris"
"dubbo.apache.org/dubbo-go/v3/remoting/polaris/parser"
)

type polarisTpsLimiter struct {
limitApi polaris.LimitAPI
}

func (pl *polarisTpsLimiter) IsAllowable(url *common.URL, invocation protocol.Invocation) bool {
var err error

pl.limitApi, err = remotingpolaris.GetLimiterAPI()
if err != nil {
logger.Error("[TpsLimiter][Polaris] create polaris LimitAPI fail : %+v", err)
return true
}

req := pl.buildQuotaRequest(url, invocation)
if req == nil {
return true
}
logger.Debugf("[TpsLimiter][Polaris] quota req : %+v", req)

resp, err := pl.limitApi.GetQuota(req)
if err != nil {
logger.Error("[TpsLimiter][Polaris] ns:%s svc:%s get quota fail : %+v", remotingpolaris.GetNamespace(), url.Service(), err)
return true
}

return resp.Get().Code == model.QuotaResultOk
}

func (pl *polarisTpsLimiter) buildQuotaRequest(url *common.URL, invoaction protocol.Invocation) polaris.QuotaRequest {
ns := remotingpolaris.GetNamespace()
applicationMode := false
for _, item := range config.GetRootConfig().Registries {
if item.Protocol == constant.PolarisKey {
applicationMode = item.RegistryType == constant.ServiceKey
}
}

svc := "providers:" + url.Service()
method := invoaction.MethodName()
if applicationMode {
svc = config.GetApplicationConfig().Name
method = url.Interface() + "/" + invoaction.MethodName()
}

req := polaris.NewQuotaRequest()
req.SetNamespace(ns)
req.SetService(svc)
req.SetMethod(method)

matchs, ok := pl.buildArguments(req.(*model.QuotaRequestImpl))
if !ok {
return nil
}

attachement := invoaction.Attachments()
arguments := invoaction.Arguments()

for i := range matchs {
item := matchs[i]
switch item.GetType() {
case v1.MatchArgument_HEADER:
if val, ok := attachement[item.GetKey()]; ok {
req.AddArgument(model.BuildHeaderArgument(item.GetKey(), fmt.Sprintf("%+v", val)))
}
case v1.MatchArgument_QUERY:
if val := parser.ParseArgumentsByExpression(item.GetKey(), arguments); val != nil {
req.AddArgument(model.BuildQueryArgument(item.GetKey(), fmt.Sprintf("%+v", val)))
}
case v1.MatchArgument_CALLER_IP:
callerIp := url.GetParam(constant.RemoteAddr, "")
if len(callerIp) != 0 {
req.AddArgument(model.BuildCallerIPArgument(callerIp))
}
case model.ArgumentTypeCallerService:
}
}

return req
}

func (pl *polarisTpsLimiter) buildArguments(req *model.QuotaRequestImpl) ([]*v1.MatchArgument, bool) {
engine := pl.limitApi.SDKContext().GetEngine()

getRuleReq := &data.CommonRateLimitRequest{
DstService: model.ServiceKey{
Namespace: req.GetNamespace(),
Service: req.GetService(),
},
Trigger: model.NotifyTrigger{
EnableDstRateLimit: true,
},
ControlParam: model.ControlParam{
Timeout: time.Millisecond * 500,
},
}

if err := engine.SyncGetResources(getRuleReq); err != nil {
logger.Error("[TpsLimiter][Polaris] ns:%s svc:%s get RateLimit Rule fail : %+v", req.GetNamespace(), req.GetService(), err)
return nil, false
}

svcRule := getRuleReq.RateLimitRule
if svcRule == nil || svcRule.GetValue() == nil {
logger.Warnf("[TpsLimiter][Polaris] ns:%s svc:%s get RateLimit Rule is nil", req.GetNamespace(), req.GetService())
return nil, false
}

rules, ok := svcRule.GetValue().(*v1.RateLimit)
if !ok {
logger.Error("[TpsLimiter][Polaris] ns:%s svc:%s get RateLimit Rule invalid", req.GetNamespace(), req.GetService())
return nil, false
}

ret := make([]*v1.MatchArgument, 0, 4)
for i := range rules.GetRules() {
rule := rules.GetRules()[i]
if len(rule.GetArguments()) == 0 {
continue
}

ret = append(ret, rule.Arguments...)
}

return ret, true
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go/v2 v2.1.2
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/polarismesh/polaris-go v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
Expand Down
1 change: 1 addition & 0 deletions imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/filter/hystrix"
_ "dubbo.apache.org/dubbo-go/v3/filter/metrics"
_ "dubbo.apache.org/dubbo-go/v3/filter/otel/trace"
_ "dubbo.apache.org/dubbo-go/v3/filter/polaris/limit"
_ "dubbo.apache.org/dubbo-go/v3/filter/seata"
_ "dubbo.apache.org/dubbo-go/v3/filter/sentinel"
_ "dubbo.apache.org/dubbo-go/v3/filter/token"
Expand Down
13 changes: 10 additions & 3 deletions remoting/polaris/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

var (
once sync.Once
sdkCtx api.SDKContext
once sync.Once
namesapce string
sdkCtx api.SDKContext
)

var (
Expand Down Expand Up @@ -83,6 +85,11 @@ func GetLimiterAPI() (polaris.LimitAPI, error) {
return polaris.NewLimitAPIByContext(sdkCtx), nil
}

// GetNamespace gets user defined namespace info
func GetNamespace() string {
return namesapce
}

// InitSDKContext inits polaris SDKContext by URL
func InitSDKContext(url *common.URL) error {
if url == nil {
Expand All @@ -91,7 +98,6 @@ func InitSDKContext(url *common.URL) error {

var rerr error
once.Do(func() {

addresses := strings.Split(url.Location, ",")
serverConfigs := make([]string, 0, len(addresses))
for _, addr := range addresses {
Expand All @@ -107,6 +113,7 @@ func InitSDKContext(url *common.URL) error {
_sdkCtx, err := api.InitContextByConfig(polarisConf)
rerr = err
sdkCtx = _sdkCtx
namesapce = url.GetParam(constant.RegistryNamespaceKey, constant.PolarisDefaultNamespace)
})

return rerr
Expand Down

0 comments on commit 40ce3ba

Please sign in to comment.