Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle rate limit config update event #196

Merged
merged 100 commits into from Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 89 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
5b09deb
Merge pull request #1 from apache/develop
ztelur Apr 13, 2021
3ebd6a8
init
ztelur Apr 13, 2021
848409d
update dubbo-go to v1.5.6 & fmt sth.
mark4z Apr 15, 2021
17a701e
update
ztelur Apr 15, 2021
8133665
fix fmt
mark4z Apr 15, 2021
3dc4309
add gin
ztelur Apr 16, 2021
b13baa0
baseinfo and resources
ztelur Apr 20, 2021
980669a
Merge branch 'develop' into update_dubbo
mark4z Apr 21, 2021
67d98da
Update go.mod
mark4z Apr 22, 2021
f3f2437
update dubbo-go to v1.5.6 & fmt sth.
mark4z Apr 22, 2021
8d76eeb
api complete
ztelur Apr 25, 2021
c78093b
import
ztelur Apr 25, 2021
635acde
import
ztelur Apr 25, 2021
44543c4
update dubbo-go to v1.5.6 & fmt sth.
mark4z Apr 15, 2021
bca62b8
fix fmt
mark4z Apr 15, 2021
0547c78
Merge branch 'develop' into update_dubbo
mark4z Apr 28, 2021
2f8c7fd
update
ztelur Apr 30, 2021
0d4c14c
update
ztelur May 8, 2021
94fe2cc
update
ztelur May 8, 2021
2334b25
update
ztelur May 10, 2021
a49b509
replace path with id
ztelur May 12, 2021
f71f783
update
ztelur May 15, 2021
f387764
update
ztelur May 15, 2021
d807c83
Merge branch 'apache:develop' into develop
ztelur May 17, 2021
af4a56f
rateLimit filter
mark4z Mar 19, 2021
9b99a24
add licence header
mark4z May 17, 2021
c865a5d
fix imports
mark4z May 17, 2021
2e35f60
add license header
mark4z May 17, 2021
564a2d3
add license header
mark4z May 17, 2021
be02472
Merge branch 'develop' into ratelimit
kezhenxu94 May 17, 2021
2563b4d
fix ci
mark4z May 17, 2021
93363b7
Merge remote-tracking branch 'origin/ratelimit' into ratelimit
mark4z May 17, 2021
539bf9a
update
ztelur May 17, 2021
1875ea1
Merge branch 'develop' into admin
ztelur May 17, 2021
5415ef9
update
ztelur May 17, 2021
3faf20d
comment
ztelur May 17, 2021
bba6b4b
use go 1.15 run ci to fix ci problem
ztelur May 17, 2021
90836a4
Merge branch 'develop' into ratelimit
mark4z May 18, 2021
da69b51
fix matcher var inline
mark4z May 18, 2021
49015e4
Merge remote-tracking branch 'origin/ratelimit' into ratelimit
mark4z May 18, 2021
2dd2b8e
Merge branch 'develop' into update_dubbo
mark4z May 18, 2021
172ad82
fix ci
mark4z May 18, 2021
c73a7d0
fix sync.RWMutex
mark4z May 18, 2021
153e99c
errors.warp
mark4z May 18, 2021
02cb291
test table
mark4z May 18, 2021
619dab6
use gost zk
ztelur May 18, 2021
3c059df
add lock
ztelur May 19, 2021
15a66ed
remove unreachable statement
mark4z May 19, 2021
b9a7c17
Pattern pattern,omitempty
mark4z May 21, 2021
b435753
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z May 23, 2021
8a6dae2
update develop
mark4z May 23, 2021
9d5c3ae
modify import and chinese comment
ztelur May 26, 2021
79d0c3b
update the docs for ratelimit
mark4z May 28, 2021
602f8b2
update develop
mark4z May 29, 2021
c3a7da5
ratelimit filter
mark4z Jun 6, 2021
b93e6b1
ratelimit filter
mark4z Jun 6, 2021
5ef9f3a
ratelimit filter
mark4z Jun 6, 2021
f0e7b70
Merge branch 'develop' into update_dubbo
mark4z Jun 10, 2021
ee45a7b
ratelimit filter
mark4z Jun 10, 2021
bb18bb0
ratelimit filter
mark4z Jun 11, 2021
fd1abef
ratelimit filter
mark4z Jun 11, 2021
5f9e8ca
ratelimit filter
mark4z Jun 12, 2021
10d44e8
ratelimit filter
mark4z Jun 13, 2021
5d24779
Merge branch 'update_dubbo' into ratelimit
mark4z Jun 13, 2021
a4597ab
ratelimit filter
mark4z Jun 13, 2021
a482ff4
ratelimit filter
mark4z Jun 13, 2021
5429895
ratelimit filter
mark4z Jun 13, 2021
8af536e
ratelimit filter
mark4z Jun 13, 2021
685ce3a
split func and bugfix
ztelur Jun 14, 2021
9f6c153
Merge branch 'develop' into admin
ztelur Jun 14, 2021
a2bc557
fix go.mod go.sum
ztelur Jun 14, 2021
fc4a311
Merge branch 'apache:develop' into develop
ztelur Jun 14, 2021
948fd60
fix go.mod go.sum
ztelur Jun 14, 2021
550dacd
Merge branch 'develop' into admin
ztelur Jun 14, 2021
3fea7f3
fix go.mod go.sum
ztelur Jun 14, 2021
1a5d37b
ratelimit config change
mark4z Jun 14, 2021
7025c7f
Merge branch 'ratelimit' into admin
mark4z Jun 14, 2021
7064c9d
ratelimit config change
mark4z Jun 14, 2021
6047cb5
ratelimit config change
mark4z Jun 14, 2021
0c4b42f
ratelimit config change
mark4z Jun 14, 2021
b7e01aa
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z Jun 25, 2021
16eadb6
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z Jun 26, 2021
09bdadf
ratelimit config change
mark4z Jun 26, 2021
b1d81ba
Merge remote-tracking branch 'origin/admin' into admin
mark4z Jun 26, 2021
ccd46cf
ratelimit config change
mark4z Jun 26, 2021
6157310
ratelimit config change
mark4z Jun 26, 2021
0a391fb
ratelimit config change
mark4z Jun 26, 2021
b404495
update go.sum
mark4z Jun 27, 2021
d53a3ad
Merge remote-tracking branch 'origin/admin' into admin
mark4z Jun 27, 2021
18a5c11
fix logger fmt style
mark4z Jun 28, 2021
9653ee8
fix logger fmt style
mark4z Jun 28, 2021
445e8db
time delay when lazy load GenericService
mark4z Jun 29, 2021
dc32330
Merge remote-tracking branch 'origin/admin' into admin
mark4z Jun 29, 2021
9fb02a8
time delay when lazy load GenericService
mark4z Jun 29, 2021
a020a53
fix ci
mark4z Jun 29, 2021
19854ca
fix logger fmt style
mark4z Jun 29, 2021
795260e
err.Error() is not a good style
mark4z Jun 30, 2021
0a98f8e
Merge branch 'develop' into admin
kezhenxu94 Jun 30, 2021
0da1edc
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z Jul 1, 2021
4c68f88
fix ci
mark4z Jul 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 0 additions & 9 deletions cmd/pixiu/pixiu.go
Expand Up @@ -30,15 +30,6 @@ import (
"github.com/urfave/cli"
)

import (
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/logger"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/recovery"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/remote"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/response"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/timeout"
)

// Version pixiu version
var Version = "0.3.0"

Expand Down
160 changes: 158 additions & 2 deletions pkg/config/api_config.go
Expand Up @@ -27,6 +27,7 @@ import (

import (
fc "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit"
etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
Expand All @@ -47,6 +48,12 @@ var (
lock sync.RWMutex
)

var (
BASE_INFO_NAME = "name"
BASE_INFO_DESC = "description"
BASE_INFO_PFP = "pluginFilePath"
)

// APIConfigResourceListener defines api resource and method config listener interface
type APIConfigResourceListener interface {
// ResourceChange handle modify resource event
Expand All @@ -61,6 +68,12 @@ type APIConfigResourceListener interface {
MethodAdd(res fc.Resource, method fc.Method) bool
// MethodDelete handle delete method event
MethodDelete(res fc.Resource, method fc.Method) bool

PluginPathChange(filePath string)

PluginGroupChange(group []fc.PluginsGroup)

RateLimitChange(*fr.Config)
}

// LoadAPIConfigFromFile load the api config from file
Expand Down Expand Up @@ -105,11 +118,21 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) {

func initAPIConfigFromKVList(kList, vList []string) error {
var skList, svList, mkList, mvList []string
var baseInfo string
var pluginGroup []string
var rateLimit string

for i, k := range kList {
v := vList[i]
//handle base info
re := getCheckBaseInfoRegexp()
if m := re.Match([]byte(k)); m {
baseInfo = v
continue
}

// handle resource
re := getCheckResourceRegexp()
re = getCheckResourceRegexp()
if m := re.Match([]byte(k)); m {
skList = append(skList, k)
svList = append(svList, v)
Expand All @@ -122,12 +145,30 @@ func initAPIConfigFromKVList(kList, vList []string) error {
mvList = append(mvList, v)
continue
}

//handle plugin group
re = getCheckPluginsGroupRegexp()
if m := re.Match([]byte(k)); m {
pluginGroup = append(pluginGroup, v)
continue
}

//handle rate limit config
re = getCheckRatelimitRegexp()
if m := re.Match([]byte(k)); m {
rateLimit = v
continue
}
}

lock.Lock()
defer lock.Unlock()

tmpApiConf := &fc.APIConfig{}
if err := initBaseInfoFromString(tmpApiConf, baseInfo); err != nil {
logger.Error("initBaseInfoFromString error %v", err.Error())
return err
}
if err := initAPIConfigServiceFromKvList(tmpApiConf, skList, svList); err != nil {
logger.Error("initAPIConfigServiceFromKvList error %v", err.Error())
return err
Expand All @@ -136,11 +177,61 @@ func initAPIConfigFromKVList(kList, vList []string) error {
logger.Error("initAPIConfigMethodFromKvList error %v", err.Error())
return err
}
if err := initAPIConfigPluginsFromStringList(tmpApiConf, pluginGroup); err != nil {
logger.Error("initAPIConfigPluginsFromStringList error %v", err.Error())
return err
}
if err := initAPIConfigRatelimitFromString(tmpApiConf, rateLimit); err != nil {
logger.Error("initAPIConfigRatelimitFromString error %v", err.Error())
return err
}

apiConfig = tmpApiConf
return nil
}

func initBaseInfoFromString(conf *fc.APIConfig, str string) error {
properties := make(map[string]string, 8)
if err := yaml.UnmarshalYML([]byte(str), properties); err != nil {
logger.Errorf("unmarshalYmlConfig error %v", err.Error())
mark4z marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if v, ok := properties[BASE_INFO_NAME]; ok {
conf.Name = v
}
if v, ok := properties[BASE_INFO_DESC]; ok {
conf.Description = v
}
if v, ok := properties[BASE_INFO_PFP]; ok {
conf.PluginFilePath = v
}
return nil
}

func initAPIConfigRatelimitFromString(conf *fc.APIConfig, str string) error {
c := fr.Config{}
if err := yaml.UnmarshalYML([]byte(str), &c); err != nil {
logger.Errorf("unmarshalYmlConfig error %v", err.Error())
return err
}
conf.RateLimit = c
return nil
}

func initAPIConfigPluginsFromStringList(conf *fc.APIConfig, plugins []string) error {
var groups []fc.PluginsGroup
for _, v := range plugins {
g := fc.PluginsGroup{}
if err := yaml.UnmarshalYML([]byte(v), &g); err != nil {
logger.Errorf("unmarshalYmlConfig error %v", err.Error())
return err
}
groups = append(groups, g)
}
conf.PluginsGroup = groups
return nil
}

func initAPIConfigMethodFromKvList(config *fc.APIConfig, kList, vList []string) error {
for i, _ := range kList {
v := vList[i]
Expand Down Expand Up @@ -304,6 +395,12 @@ func handleDeleteEvent(key, val []byte) {
}
deleteApiConfigMethod(resourceId, methodId)
}

re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
empty := &fr.Config{}
listener.RateLimitChange(empty)
}
}

func handlePutEvent(key, val []byte) {
Expand Down Expand Up @@ -332,6 +429,27 @@ func handlePutEvent(key, val []byte) {
}
mergeApiConfigMethod(res.ResourcePath, *res)
}

//handle base info
re = getCheckBaseInfoRegexp()
if m := re.Match(key); m {
mergeBaseInfo(val)
return
}

//handle plugins group
re = getCheckPluginsGroupRegexp()
if m := re.Match(key); m {
mergePluginGroup(val)
return
}

//handle ratelimit
re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
mergeRatelimit(val)
return
}
}

func deleteApiConfigResource(resourceId int) {
Expand Down Expand Up @@ -361,6 +479,36 @@ func mergeApiConfigResource(val fc.Resource) {
listener.ResourceAdd(val)
}

func mergeRatelimit(val []byte) {
c := &fr.Config{}
if err := yaml.UnmarshalYML(val, c); err != nil {
logger.Errorf("unmarshalYmlConfig error %v", err.Error())
return
}
apiConfig.RateLimit = *c
listener.RateLimitChange(c)
}

func mergePluginGroup(val []byte) {
g := &fc.PluginsGroup{}
if err := yaml.UnmarshalYML(val, g); err != nil {
logger.Errorf("unmarshalYmlConfig error %v", err.Error())
return
}
for i, v := range apiConfig.PluginsGroup {
if v.GroupName == g.GroupName {
apiConfig.PluginsGroup[i] = *g
}
}
listener.PluginGroupChange(apiConfig.PluginsGroup)
}

func mergeBaseInfo(val []byte) {
_ = initBaseInfoFromString(apiConfig, string(val))

listener.PluginPathChange(apiConfig.PluginFilePath)
}

func deleteApiConfigMethod(resourceId, methodId int) {
for _, resource := range apiConfig.Resources {
if resource.ID != resourceId {
Expand Down Expand Up @@ -402,7 +550,7 @@ func mergeApiConfigMethod(path string, val fc.Method) {
}

func getCheckBaseInfoRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/base$")
return regexp.MustCompile(".+/Base$")
}

func getCheckResourceRegexp() *regexp.Regexp {
Expand All @@ -413,6 +561,14 @@ func getExtractMethodRegexp() *regexp.Regexp {
return regexp.MustCompile("Resources/([^/]+)/Method/[^/]+/?$")
}

func getCheckPluginsGroupRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/PluginGroup/[^/]+/?$")
}

func getCheckRatelimitRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/Ratelimit/[^/]+/?$")
}

// RegisterConfigListener register APIConfigListener
func RegisterConfigListener(li APIConfigResourceListener) {
listener = li
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/accesslog/access_log.go
Expand Up @@ -40,7 +40,7 @@ import (

var accessLogWriter = &model.AccessLogWriter{AccessLogDataChan: make(chan model.AccessLogData, constant.LogDataBuffer)}

func init() {
func Init() {
extension.SetFilterFunc(constant.AccessLogFilter, accessLog())
accessLogWriter.Write()
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/filter/plugins/plugins.go
Expand Up @@ -38,6 +38,7 @@ var (
// url path -> filter chain
filterChainCache = make(map[string]FilterChain)
groupCache = make(map[string]map[string]WithFunc)
localFilePath = ""

errEmptyConfig = errors.New("Empty plugin config")
)
Expand All @@ -60,24 +61,33 @@ type WithFunc struct {
fn context.FilterFunc
}

func OnFilePathChange(filePath string) {
if len(filePath) == 0 {
return
}
localFilePath = filePath
}

// OnResourceUpdate update plugins cache map when api-resource update
func OnResourceUpdate(resource *config.Resource) {
InitFilterChainForResource(resource, "", nil)
}

// OnGroupUpdate update group cache
func OnGroupUpdate(groups []config.PluginsGroup, filePath string) {
InitPluginsGroup(groups, filePath)
func OnGroupUpdate(groups []config.PluginsGroup) {
InitPluginsGroup(groups, "")
}

// InitPluginsGroup prase api_config.yaml(pluginsGroup) to map[string][]PluginsWithFunc
func InitPluginsGroup(groups []config.PluginsGroup, filePath string) {
if "" == filePath || len(groups) == 0 {
OnFilePathChange(filePath)

if "" == localFilePath || len(groups) == 0 {
return
}

// load file.so
pls, err := plugin.Open(filePath)
pls, err := plugin.Open(localFilePath)
if nil != err {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/initialize/init.go
Expand Up @@ -18,6 +18,7 @@
package initialize

import (
"github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog"
"github.com/apache/dubbo-go-pixiu/pkg/filter/api"
"github.com/apache/dubbo-go-pixiu/pkg/filter/authority"
"github.com/apache/dubbo-go-pixiu/pkg/filter/logger"
Expand All @@ -41,6 +42,7 @@ func Run(config config.APIConfig) {
}

func filterInit(config *config.APIConfig) {
accesslog.Init()
api.Init()
authority.Init()
logger.Init()
Expand Down
15 changes: 15 additions & 0 deletions pkg/service/api/discovery_service.go
Expand Up @@ -27,12 +27,15 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension"
pc "github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/filter/plugins"
"github.com/apache/dubbo-go-pixiu/pkg/filter/ratelimit"
"github.com/apache/dubbo-go-pixiu/pkg/router"
"github.com/apache/dubbo-go-pixiu/pkg/service"
)

import (
"github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
ratelimitConf "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit"
fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router"
)

Expand Down Expand Up @@ -269,3 +272,15 @@ func loadAPIFromMethods(fullPath string, methods []config.Method, headers map[st
}
return nil
}

func (ads *LocalMemoryAPIDiscoveryService) PluginPathChange(filePath string) {
mark4z marked this conversation as resolved.
Show resolved Hide resolved
plugins.OnFilePathChange(filePath)
}

func (ads *LocalMemoryAPIDiscoveryService) PluginGroupChange(group []config.PluginsGroup) {
plugins.OnGroupUpdate(group)
}

func (ads *LocalMemoryAPIDiscoveryService) RateLimitChange(c *ratelimitConf.Config) {
ratelimit.OnUpdate(c)
}