Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle rate limit config update event #196

Merged
merged 100 commits into from Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 97 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
5b09deb
Merge pull request #1 from apache/develop
ztelur Apr 13, 2021
3ebd6a8
init
ztelur Apr 13, 2021
848409d
update dubbo-go to v1.5.6 & fmt sth.
mark4z Apr 15, 2021
17a701e
update
ztelur Apr 15, 2021
8133665
fix fmt
mark4z Apr 15, 2021
3dc4309
add gin
ztelur Apr 16, 2021
b13baa0
baseinfo and resources
ztelur Apr 20, 2021
980669a
Merge branch 'develop' into update_dubbo
mark4z Apr 21, 2021
67d98da
Update go.mod
mark4z Apr 22, 2021
f3f2437
update dubbo-go to v1.5.6 & fmt sth.
mark4z Apr 22, 2021
8d76eeb
api complete
ztelur Apr 25, 2021
c78093b
import
ztelur Apr 25, 2021
635acde
import
ztelur Apr 25, 2021
44543c4
update dubbo-go to v1.5.6 & fmt sth.
mark4z Apr 15, 2021
bca62b8
fix fmt
mark4z Apr 15, 2021
0547c78
Merge branch 'develop' into update_dubbo
mark4z Apr 28, 2021
2f8c7fd
update
ztelur Apr 30, 2021
0d4c14c
update
ztelur May 8, 2021
94fe2cc
update
ztelur May 8, 2021
2334b25
update
ztelur May 10, 2021
a49b509
replace path with id
ztelur May 12, 2021
f71f783
update
ztelur May 15, 2021
f387764
update
ztelur May 15, 2021
d807c83
Merge branch 'apache:develop' into develop
ztelur May 17, 2021
af4a56f
rateLimit filter
mark4z Mar 19, 2021
9b99a24
add licence header
mark4z May 17, 2021
c865a5d
fix imports
mark4z May 17, 2021
2e35f60
add license header
mark4z May 17, 2021
564a2d3
add license header
mark4z May 17, 2021
be02472
Merge branch 'develop' into ratelimit
kezhenxu94 May 17, 2021
2563b4d
fix ci
mark4z May 17, 2021
93363b7
Merge remote-tracking branch 'origin/ratelimit' into ratelimit
mark4z May 17, 2021
539bf9a
update
ztelur May 17, 2021
1875ea1
Merge branch 'develop' into admin
ztelur May 17, 2021
5415ef9
update
ztelur May 17, 2021
3faf20d
comment
ztelur May 17, 2021
bba6b4b
use go 1.15 run ci to fix ci problem
ztelur May 17, 2021
90836a4
Merge branch 'develop' into ratelimit
mark4z May 18, 2021
da69b51
fix matcher var inline
mark4z May 18, 2021
49015e4
Merge remote-tracking branch 'origin/ratelimit' into ratelimit
mark4z May 18, 2021
2dd2b8e
Merge branch 'develop' into update_dubbo
mark4z May 18, 2021
172ad82
fix ci
mark4z May 18, 2021
c73a7d0
fix sync.RWMutex
mark4z May 18, 2021
153e99c
errors.warp
mark4z May 18, 2021
02cb291
test table
mark4z May 18, 2021
619dab6
use gost zk
ztelur May 18, 2021
3c059df
add lock
ztelur May 19, 2021
15a66ed
remove unreachable statement
mark4z May 19, 2021
b9a7c17
Pattern pattern,omitempty
mark4z May 21, 2021
b435753
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z May 23, 2021
8a6dae2
update develop
mark4z May 23, 2021
9d5c3ae
modify import and chinese comment
ztelur May 26, 2021
79d0c3b
update the docs for ratelimit
mark4z May 28, 2021
602f8b2
update develop
mark4z May 29, 2021
c3a7da5
ratelimit filter
mark4z Jun 6, 2021
b93e6b1
ratelimit filter
mark4z Jun 6, 2021
5ef9f3a
ratelimit filter
mark4z Jun 6, 2021
f0e7b70
Merge branch 'develop' into update_dubbo
mark4z Jun 10, 2021
ee45a7b
ratelimit filter
mark4z Jun 10, 2021
bb18bb0
ratelimit filter
mark4z Jun 11, 2021
fd1abef
ratelimit filter
mark4z Jun 11, 2021
5f9e8ca
ratelimit filter
mark4z Jun 12, 2021
10d44e8
ratelimit filter
mark4z Jun 13, 2021
5d24779
Merge branch 'update_dubbo' into ratelimit
mark4z Jun 13, 2021
a4597ab
ratelimit filter
mark4z Jun 13, 2021
a482ff4
ratelimit filter
mark4z Jun 13, 2021
5429895
ratelimit filter
mark4z Jun 13, 2021
8af536e
ratelimit filter
mark4z Jun 13, 2021
685ce3a
split func and bugfix
ztelur Jun 14, 2021
9f6c153
Merge branch 'develop' into admin
ztelur Jun 14, 2021
a2bc557
fix go.mod go.sum
ztelur Jun 14, 2021
fc4a311
Merge branch 'apache:develop' into develop
ztelur Jun 14, 2021
948fd60
fix go.mod go.sum
ztelur Jun 14, 2021
550dacd
Merge branch 'develop' into admin
ztelur Jun 14, 2021
3fea7f3
fix go.mod go.sum
ztelur Jun 14, 2021
1a5d37b
ratelimit config change
mark4z Jun 14, 2021
7025c7f
Merge branch 'ratelimit' into admin
mark4z Jun 14, 2021
7064c9d
ratelimit config change
mark4z Jun 14, 2021
6047cb5
ratelimit config change
mark4z Jun 14, 2021
0c4b42f
ratelimit config change
mark4z Jun 14, 2021
b7e01aa
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z Jun 25, 2021
16eadb6
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z Jun 26, 2021
09bdadf
ratelimit config change
mark4z Jun 26, 2021
b1d81ba
Merge remote-tracking branch 'origin/admin' into admin
mark4z Jun 26, 2021
ccd46cf
ratelimit config change
mark4z Jun 26, 2021
6157310
ratelimit config change
mark4z Jun 26, 2021
0a391fb
ratelimit config change
mark4z Jun 26, 2021
b404495
update go.sum
mark4z Jun 27, 2021
d53a3ad
Merge remote-tracking branch 'origin/admin' into admin
mark4z Jun 27, 2021
18a5c11
fix logger fmt style
mark4z Jun 28, 2021
9653ee8
fix logger fmt style
mark4z Jun 28, 2021
445e8db
time delay when lazy load GenericService
mark4z Jun 29, 2021
dc32330
Merge remote-tracking branch 'origin/admin' into admin
mark4z Jun 29, 2021
9fb02a8
time delay when lazy load GenericService
mark4z Jun 29, 2021
a020a53
fix ci
mark4z Jun 29, 2021
19854ca
fix logger fmt style
mark4z Jun 29, 2021
795260e
err.Error() is not a good style
mark4z Jun 30, 2021
0a98f8e
Merge branch 'develop' into admin
kezhenxu94 Jun 30, 2021
0da1edc
Merge branch 'develop' of https://github.com/apache/dubbo-go-pixiu in…
mark4z Jul 1, 2021
4c68f88
fix ci
mark4z Jul 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 0 additions & 9 deletions cmd/pixiu/pixiu.go
Expand Up @@ -30,15 +30,6 @@ import (
"github.com/urfave/cli"
)

import (
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/accesslog"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/logger"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/recovery"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/remote"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/response"
_ "github.com/apache/dubbo-go-pixiu/pkg/filter/timeout"
)

// Version pixiu version
var Version = "0.3.0"

Expand Down
4 changes: 3 additions & 1 deletion pkg/client/dubbo/dubbo.go
Expand Up @@ -260,7 +260,9 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *dg.Generic
dc.lock.Lock()
defer dc.lock.Unlock()
referenceConfig.GenericLoad(key)
time.Sleep(200 * time.Millisecond) // sleep to wait invoker create
//TODO: fix it later
// sleep to wait invoker create
time.Sleep(500 * time.Millisecond)
clientService := referenceConfig.GetRPCService().(*dg.GenericService)

dc.GenericServicePool[key] = clientService
Expand Down
188 changes: 172 additions & 16 deletions pkg/config/api_config.go
Expand Up @@ -27,6 +27,7 @@ import (

import (
fc "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
fr "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config/ratelimit"
etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
Expand All @@ -47,6 +48,12 @@ var (
lock sync.RWMutex
)

var (
BASE_INFO_NAME = "name"
BASE_INFO_DESC = "description"
BASE_INFO_PFP = "pluginFilePath"
)

// APIConfigResourceListener defines api resource and method config listener interface
type APIConfigResourceListener interface {
// ResourceChange handle modify resource event
Expand All @@ -61,6 +68,12 @@ type APIConfigResourceListener interface {
MethodAdd(res fc.Resource, method fc.Method) bool
// MethodDelete handle delete method event
MethodDelete(res fc.Resource, method fc.Method) bool

PluginPathChange(filePath string)

PluginGroupChange(group []fc.PluginsGroup)

RateLimitChange(*fr.Config)
}

// LoadAPIConfigFromFile load the api config from file
Expand All @@ -72,7 +85,7 @@ func LoadAPIConfigFromFile(path string) (*fc.APIConfig, error) {
apiConf := &fc.APIConfig{}
err := yaml.UnmarshalYMLConfig(path, apiConf)
if err != nil {
return nil, perrors.Errorf("unmarshalYmlConfig error %v", perrors.WithStack(err))
return nil, perrors.Errorf("unmarshalYmlConfig error %s", perrors.WithStack(err))
}
apiConfig = apiConf
return apiConf, nil
Expand All @@ -86,13 +99,13 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) {
etcdv3.WithEndpoints(strings.Split(metaConfig.Address, ",")...),
)
if err != nil {
return nil, perrors.Errorf("Init etcd client fail error %v", err)
return nil, perrors.Errorf("Init etcd client fail error %s", err)
}

client = tmpClient
kList, vList, err := client.GetChildren(metaConfig.APIConfigPath)
if err != nil {
return nil, perrors.Errorf("Get remote config fail error %v", err)
return nil, perrors.Errorf("Get remote config fail error %s", err)
}
if err = initAPIConfigFromKVList(kList, vList); err != nil {
return nil, err
Expand All @@ -105,11 +118,21 @@ func LoadAPIConfig(metaConfig *model.APIMetaConfig) (*fc.APIConfig, error) {

func initAPIConfigFromKVList(kList, vList []string) error {
var skList, svList, mkList, mvList []string
var baseInfo string
var pluginGroup []string
var rateLimit string

for i, k := range kList {
v := vList[i]
//handle base info
re := getCheckBaseInfoRegexp()
if m := re.Match([]byte(k)); m {
baseInfo = v
continue
}

// handle resource
re := getCheckResourceRegexp()
re = getCheckResourceRegexp()
if m := re.Match([]byte(k)); m {
skList = append(skList, k)
svList = append(svList, v)
Expand All @@ -122,32 +145,100 @@ func initAPIConfigFromKVList(kList, vList []string) error {
mvList = append(mvList, v)
continue
}

//handle plugin group
re = getCheckPluginsGroupRegexp()
if m := re.Match([]byte(k)); m {
pluginGroup = append(pluginGroup, v)
continue
}

//handle rate limit config
re = getCheckRatelimitRegexp()
if m := re.Match([]byte(k)); m {
rateLimit = v
continue
}
}

lock.Lock()
defer lock.Unlock()

tmpApiConf := &fc.APIConfig{}
if err := initBaseInfoFromString(tmpApiConf, baseInfo); err != nil {
logger.Errorf("initBaseInfoFromString error %s", err)
return err
}
if err := initAPIConfigServiceFromKvList(tmpApiConf, skList, svList); err != nil {
logger.Error("initAPIConfigServiceFromKvList error %v", err.Error())
logger.Errorf("initAPIConfigServiceFromKvList error %s", err)
return err
}
if err := initAPIConfigMethodFromKvList(tmpApiConf, mkList, mvList); err != nil {
logger.Error("initAPIConfigMethodFromKvList error %v", err.Error())
logger.Errorf("initAPIConfigMethodFromKvList error %s", err)
return err
}
if err := initAPIConfigPluginsFromStringList(tmpApiConf, pluginGroup); err != nil {
logger.Errorf("initAPIConfigPluginsFromStringList error %s", err)
return err
}
if err := initAPIConfigRatelimitFromString(tmpApiConf, rateLimit); err != nil {
logger.Errorf("initAPIConfigRatelimitFromString error %s", err)
return err
}

apiConfig = tmpApiConf
return nil
}

func initBaseInfoFromString(conf *fc.APIConfig, str string) error {
properties := make(map[string]string, 8)
if err := yaml.UnmarshalYML([]byte(str), properties); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
if v, ok := properties[BASE_INFO_NAME]; ok {
conf.Name = v
}
if v, ok := properties[BASE_INFO_DESC]; ok {
conf.Description = v
}
if v, ok := properties[BASE_INFO_PFP]; ok {
conf.PluginFilePath = v
}
return nil
}

func initAPIConfigRatelimitFromString(conf *fc.APIConfig, str string) error {
c := fr.Config{}
if err := yaml.UnmarshalYML([]byte(str), &c); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
conf.RateLimit = c
return nil
}

func initAPIConfigPluginsFromStringList(conf *fc.APIConfig, plugins []string) error {
var groups []fc.PluginsGroup
for _, v := range plugins {
g := fc.PluginsGroup{}
if err := yaml.UnmarshalYML([]byte(v), &g); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}
groups = append(groups, g)
}
conf.PluginsGroup = groups
return nil
}

func initAPIConfigMethodFromKvList(config *fc.APIConfig, kList, vList []string) error {
for i, _ := range kList {
v := vList[i]
method := &fc.Method{}
err := yaml.UnmarshalYML([]byte(v), method)
if err != nil {
logger.Error("unmarshalYmlConfig error %v", err.Error())
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}

Expand Down Expand Up @@ -188,7 +279,7 @@ func initAPIConfigServiceFromKvList(config *fc.APIConfig, kList, vList []string)
resource := &fc.Resource{}
err := yaml.UnmarshalYML([]byte(v), resource)
if err != nil {
logger.Error("unmarshalYmlConfig error %v", err.Error())
logger.Errorf("unmarshalYmlConfig error %s", err)
return err
}

Expand Down Expand Up @@ -219,7 +310,7 @@ func listenResourceAndMethodEvent(key string) bool {
for {
wc, err := client.WatchWithOption(key, clientv3.WithPrefix())
if err != nil {
logger.Warnf("Watch api config {key:%s} = error{%v}", key, err)
logger.Warnf("Watch api config {key:%s} = error{%s}", key, err)
return false
}

Expand Down Expand Up @@ -275,7 +366,7 @@ func handleDeleteEvent(key, val []byte) {
resourceIdStr := pathArray[len(pathArray)-1]
ID, err := strconv.Atoi(resourceIdStr)
if err != nil {
logger.Error("handleDeleteEvent ID is not int error %v", err)
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
deleteApiConfigResource(ID)
Expand All @@ -292,18 +383,24 @@ func handleDeleteEvent(key, val []byte) {
resourceIdStr := pathArray[len(pathArray)-3]
resourceId, err := strconv.Atoi(resourceIdStr)
if err != nil {
logger.Error("handleDeleteEvent ID is not int error %v", err)
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}

methodIdStr := pathArray[len(pathArray)-1]
methodId, err := strconv.Atoi(methodIdStr)
if err != nil {
logger.Error("handleDeleteEvent ID is not int error %v", err)
logger.Errorf("handleDeleteEvent ID is not int error %s", err)
return
}
deleteApiConfigMethod(resourceId, methodId)
}

re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
empty := &fr.Config{}
listener.RateLimitChange(empty)
}
}

func handlePutEvent(key, val []byte) {
Expand All @@ -315,7 +412,7 @@ func handlePutEvent(key, val []byte) {
res := &fc.Resource{}
err := yaml.UnmarshalYML(val, res)
if err != nil {
logger.Error("handlePutEvent UnmarshalYML error %v", err)
logger.Errorf("handlePutEvent UnmarshalYML error %s", err)
return
}
mergeApiConfigResource(*res)
Expand All @@ -327,11 +424,32 @@ func handlePutEvent(key, val []byte) {
res := &fc.Method{}
err := yaml.UnmarshalYML(val, res)
if err != nil {
logger.Error("handlePutEvent UnmarshalYML error %v", err)
logger.Errorf("handlePutEvent UnmarshalYML error %s", err)
return
}
mergeApiConfigMethod(res.ResourcePath, *res)
}

//handle base info
re = getCheckBaseInfoRegexp()
if m := re.Match(key); m {
mergeBaseInfo(val)
return
}

//handle plugins group
re = getCheckPluginsGroupRegexp()
if m := re.Match(key); m {
mergePluginGroup(val)
return
}

//handle ratelimit
re = getCheckRatelimitRegexp()
if m := re.Match(key); m {
mergeRatelimit(val)
return
}
}

func deleteApiConfigResource(resourceId int) {
Expand Down Expand Up @@ -361,6 +479,36 @@ func mergeApiConfigResource(val fc.Resource) {
listener.ResourceAdd(val)
}

func mergeRatelimit(val []byte) {
c := &fr.Config{}
if err := yaml.UnmarshalYML(val, c); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return
}
apiConfig.RateLimit = *c
listener.RateLimitChange(c)
}

func mergePluginGroup(val []byte) {
g := &fc.PluginsGroup{}
if err := yaml.UnmarshalYML(val, g); err != nil {
logger.Errorf("unmarshalYmlConfig error %s", err)
return
}
for i, v := range apiConfig.PluginsGroup {
if v.GroupName == g.GroupName {
apiConfig.PluginsGroup[i] = *g
}
}
listener.PluginGroupChange(apiConfig.PluginsGroup)
}

func mergeBaseInfo(val []byte) {
_ = initBaseInfoFromString(apiConfig, string(val))

listener.PluginPathChange(apiConfig.PluginFilePath)
}

func deleteApiConfigMethod(resourceId, methodId int) {
for _, resource := range apiConfig.Resources {
if resource.ID != resourceId {
Expand Down Expand Up @@ -406,11 +554,19 @@ func getCheckBaseInfoRegexp() *regexp.Regexp {
}

func getCheckResourceRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/Resources/[^/]+/?$")
return regexp.MustCompile(".+/resources/[^/]+/?$")
}

func getExtractMethodRegexp() *regexp.Regexp {
return regexp.MustCompile("Resources/([^/]+)/Method/[^/]+/?$")
return regexp.MustCompile(".+/resources/([^/]+)/method/[^/]+/?$")
}

func getCheckPluginsGroupRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/filter/pluginGroup/[^/]+/?$")
}

func getCheckRatelimitRegexp() *regexp.Regexp {
return regexp.MustCompile(".+/filter/ratelimit/[^/]+/?$")
}

// RegisterConfigListener register APIConfigListener
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/accesslog/access_log.go
Expand Up @@ -40,7 +40,7 @@ import (

var accessLogWriter = &model.AccessLogWriter{AccessLogDataChan: make(chan model.AccessLogData, constant.LogDataBuffer)}

func init() {
func Init() {
extension.SetFilterFunc(constant.AccessLogFilter, accessLog())
accessLogWriter.Write()
}
Expand Down