Skip to content

Commit

Permalink
fix: dbpriv_limit_frequency_1_4 #4635
Browse files Browse the repository at this point in the history
  • Loading branch information
fanfanyangyang authored and iSecloud committed May 28, 2024
1 parent c9d535f commit 043e162
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 15 deletions.
1 change: 1 addition & 0 deletions dbm-services/mysql/db-priv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.46.1
golang.org/x/time v0.1.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

Expand Down
2 changes: 2 additions & 0 deletions dbm-services/mysql/db-priv/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
2 changes: 1 addition & 1 deletion dbm-services/mysql/db-priv/service/account_rule_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string)
return err
}

err = AccountRuleExistedPreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false)
_, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false)
if err != nil {
return err
}
Expand Down
16 changes: 12 additions & 4 deletions dbm-services/mysql/db-priv/service/add_priv.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package service

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"dbm-services/common/go-pubpkg/errno"
"dbm-services/mysql/priv-service/util"

Expand Down Expand Up @@ -64,8 +67,10 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
return outerErr
}
AddPrivLog(PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: time.Now()})
tokenBucket := make(chan int, 10)
client := util.NewClientByHosts(viper.GetString("dbmeta"))
limit := rate.Every(time.Millisecond * 200) // QPS:5
burst := 10 // 桶容量 10
limiter := rate.NewLimiter(limit, burst)
for _, rule := range m.AccoutRules { // 添加权限,for acccountRuleList;for instanceList; do create a routine
account, accountRule, outerErr := GetAccountRuleInfo(m.BkBizId, m.ClusterType, m.User, rule.Dbname)
if outerErr != nil {
Expand All @@ -74,10 +79,8 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
}
for _, dns := range m.TargetInstances {
wg.Add(1)
tokenBucket <- 0
go func(dns string) {
defer func() {
<-tokenBucket
wg.Done()
}()
var (
Expand All @@ -94,6 +97,12 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
successInfo = fmt.Sprintf(`%s,授权成功。`, baseInfo)
failInfo = fmt.Sprintf(`%s,授权失败:`, baseInfo)

err = limiter.Wait(context.Background())
if err != nil {
AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error()))
return
}

instance, err = GetCluster(client, m.ClusterType, Domain{EntryName: dns})
if err != nil {
AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error()))
Expand Down Expand Up @@ -174,7 +183,6 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
}
}
wg.Wait() // 一个协程失败,其报错信息添加到errMsg.errs。主协程wg.Wait(),等待所有协程执行完成才会返回。
close(tokenBucket)
return AddPrivResult(errMsg, successMsg)
}

Expand Down
4 changes: 0 additions & 4 deletions dbm-services/mysql/db-priv/service/admin_password.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,6 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword(jsonPara string, ticke
// 后台定时任务,1、randmize_daily比如每天执行一次,随机化没有被锁住的实例 2、randmize_expired比如每分钟执行一次随机化锁定过期的实例
// 前台页面,单据已提示实例密码被锁定是否修改,用户确认修改,因此不检查是否锁定

// 调用频繁,不重要,不记录日志
if m.Range != "randmize_expired" {
AddPrivLog(PrivLog{BkBizId: 0, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: time.Now()})
}
if m.Async && m.Range == "randmize_expired" {
// 过滤出需要随机化的实例
errCheck = m.NeedToBeRandomized()
Expand Down
16 changes: 12 additions & 4 deletions dbm-services/mysql/db-priv/service/clone_client_priv.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package service

import (
"context"
"fmt"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"dbm-services/common/go-pubpkg/errno"
"dbm-services/mysql/priv-service/util"

Expand Down Expand Up @@ -48,7 +51,9 @@ func (m *CloneClientPrivParaList) CloneClientPrivDryRun() error {
func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) error {
var errMsg Err
wg := sync.WaitGroup{}
tokenBucket := make(chan int, 10)
limit := rate.Every(time.Millisecond * 200) // QPS:5
burst := 10 // 桶容量 10
limiter := rate.NewLimiter(limit, burst)

if m.BkBizId == 0 {
return errno.BkBizIdIsEmpty
Expand Down Expand Up @@ -116,13 +121,17 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) er
// 每个集群一个协程
for _, item := range clusters {
wg.Add(1)
tokenBucket <- 0
go func(item Cluster) {
defer func() {
<-tokenBucket
wg.Done()
}()

err := limiter.Wait(context.Background())
if err != nil {
AddError(&errMsg, item.ImmuteDomain, err)
return
}

if item.ClusterType == tendbha || item.ClusterType == tendbsingle {
for _, storage := range item.Storages {
address := fmt.Sprintf("%s:%d", storage.IP, storage.Port)
Expand Down Expand Up @@ -171,7 +180,6 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) er
}(item)
}
wg.Wait()
close(tokenBucket)
if len(errMsg.errs) > 0 {
return errno.ClonePrivilegesFail.Add("\n" + strings.Join(errMsg.errs, "\n"))
}
Expand Down
6 changes: 4 additions & 2 deletions dbm-ui/backend/ticket/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def get_db_type_by_ticket(cls, ticket_type):
MYSQL_MASTER_FAIL_OVER = TicketEnumField("MYSQL_MASTER_FAIL_OVER", _("MySQL 主库故障切换"), _("集群维护"))
MYSQL_HA_APPLY = TicketEnumField("MYSQL_HA_APPLY", _("MySQL 高可用部署"), register_iam=False)
MYSQL_IMPORT_SQLFILE = TicketEnumField("MYSQL_IMPORT_SQLFILE", _("MySQL 变更SQL执行"), _("SQL 任务"))
MYSQL_FORCE_IMPORT_SQLFILE = TicketEnumField("MYSQL_FORCE_IMPORT_SQLFILE", _("MySQL 强制变更SQL执行"), _("SQL 任务"), register_iam=False) # noqa
MYSQL_FORCE_IMPORT_SQLFILE = TicketEnumField("MYSQL_FORCE_IMPORT_SQLFILE",
_("MySQL 强制变更SQL执行"), _("SQL 任务"), register_iam=False) # noqa
MYSQL_SEMANTIC_CHECK = TicketEnumField("MYSQL_SEMANTIC_CHECK", _("MySQL 模拟执行"), register_iam=False)
MYSQL_PROXY_ADD = TicketEnumField("MYSQL_PROXY_ADD", _("MySQL 添加Proxy"), _("集群维护"))
MYSQL_PROXY_SWITCH = TicketEnumField("MYSQL_PROXY_SWITCH", _("MySQL 替换Proxy"), _("集群维护"))
Expand Down Expand Up @@ -215,7 +216,8 @@ def get_db_type_by_ticket(cls, ticket_type):
TENDBCLUSTER_SPIDER_SLAVE_DESTROY = TicketEnumField("TENDBCLUSTER_SPIDER_SLAVE_DESTROY", _("TenDB Cluster 只读接入层下架"), _("访问入口")) # noqa
TENDBCLUSTER_RESTORE_SLAVE = TicketEnumField("TENDBCLUSTER_RESTORE_SLAVE", _("TenDB Cluster Slave重建"), _("集群维护")) # noqa
TENDBCLUSTER_RESTORE_LOCAL_SLAVE = TicketEnumField("TENDBCLUSTER_RESTORE_LOCAL_SLAVE", _("TenDB Cluster Slave原地重建"), _("集群维护")) # noqa
TENDBCLUSTER_MIGRATE_CLUSTER = TicketEnumField("TENDBCLUSTER_MIGRATE_CLUSTER", _("TenDB Cluster 主从迁移"), _("集群维护")) # noqa
TENDBCLUSTER_MIGRATE_CLUSTER = TicketEnumField("TENDBCLUSTER_MIGRATE_CLUSTER",
_("TenDB Cluster 主从迁移"), _("集群维护")) # noqa
TENDBCLUSTER_APPLY = TicketEnumField("TENDBCLUSTER_APPLY", _("TenDB Cluster 集群部署"))
TENDBCLUSTER_ENABLE = TicketEnumField("TENDBCLUSTER_ENABLE", _("TenDB Cluster 集群启用"), register_iam=False)
TENDBCLUSTER_DISABLE = TicketEnumField("TENDBCLUSTER_DISABLE", _("TenDB Cluster 集群禁用"), register_iam=False)
Expand Down

0 comments on commit 043e162

Please sign in to comment.