Skip to content

Commit

Permalink
perf: introduce update quota by redis
Browse files Browse the repository at this point in the history
Introduce the quota update provider, improve the performance of pushing
artifacts to same project with high concurrency by implementing
optimistic lock in redis. By default the function is disabled, open it
by set env 'QUOTA_UPDATE_PROVIDER=Redis' for the core container.

Fixes: #18440

Signed-off-by: chlins <chenyuzh@vmware.com>
  • Loading branch information
chlins committed Jul 10, 2023
1 parent adf80e9 commit d7fdfc5
Show file tree
Hide file tree
Showing 14 changed files with 573 additions and 118 deletions.
2 changes: 2 additions & 0 deletions src/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,6 @@ const (
UIMaxLengthLimitedOfNumber = 10
// ExecutionStatusRefreshIntervalSeconds is the interval seconds for refreshing execution status
ExecutionStatusRefreshIntervalSeconds = "execution_status_refresh_interval_seconds"
// QuotaUpdateProvider is the provider for updating quota, currently support Redis and DB
QuotaUpdateProvider = "quota_update_provider"
)
14 changes: 12 additions & 2 deletions src/controller/blob/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ func (c *controller) Sync(ctx context.Context, references []distribution.Descrip

func (c *controller) SetAcceptedBlobSize(ctx context.Context, sessionID string, size int64) error {
key := blobSizeKey(sessionID)
err := libredis.Instance().Set(ctx, key, size, c.blobSizeExpiration).Err()
rc, err := libredis.GetRegistryClient()
if err != nil {
return err
}

err = rc.Set(ctx, key, size, c.blobSizeExpiration).Err()
if err != nil {
log.Errorf("failed to set accepted blob size for session %s in redis, error: %v", sessionID, err)
return err
Expand All @@ -334,7 +339,12 @@ func (c *controller) SetAcceptedBlobSize(ctx context.Context, sessionID string,

func (c *controller) GetAcceptedBlobSize(ctx context.Context, sessionID string) (int64, error) {
key := blobSizeKey(sessionID)
size, err := libredis.Instance().Get(ctx, key).Int64()
rc, err := libredis.GetRegistryClient()
if err != nil {
return 0, err
}

size, err := rc.Get(ctx, key).Int64()
if err != nil {
if err == redis.Nil {
return 0, nil
Expand Down
215 changes: 194 additions & 21 deletions src/controller/quota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,48 @@ import (
"fmt"
"time"

"github.com/go-redis/redis/v8"
"golang.org/x/sync/singleflight"

// quota driver
_ "github.com/goharbor/harbor/src/controller/quota/driver"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/gtask"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
libredis "github.com/goharbor/harbor/src/lib/redis"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/quota"
"github.com/goharbor/harbor/src/pkg/quota/driver"
"github.com/goharbor/harbor/src/pkg/quota/types"

// init the db config
_ "github.com/goharbor/harbor/src/pkg/config/db"
)

func init() {
// register the async task for flushing quota to db when enable update quota by redis
if provider := config.GetQuotaUpdateProvider(); provider == updateQuotaProviderRedis.String() {
gtask.DefaultPool().AddTask(flushQuota, 30*time.Second)
}
}

type updateQuotaProviderType string

func (t updateQuotaProviderType) String() string {
return string(t)
}

var (
defaultRetryTimeout = time.Minute * 5
// quotaExpireTimeout is the expire time for quota when update quota by redis
quotaExpireTimeout = time.Minute * 5

updateQuotaProviderRedis updateQuotaProviderType = "Redis"
updateQuotaProviderDB updateQuotaProviderType = "DB"
)

var (
Expand Down Expand Up @@ -87,6 +115,31 @@ type controller struct {
reservedExpiration time.Duration

quotaMgr quota.Manager
g singleflight.Group
}

// flushQuota flushes the quota info from redis to db asynchronously.
func flushQuota(ctx context.Context) {
iter, err := cache.Default().Scan(ctx, "quota:*")
if err != nil {
log.Errorf("failed to scan out the quota records from redis")
}

for iter.Next(ctx) {
key := iter.Val()
q := &quota.Quota{}
err = cache.Default().Fetch(ctx, key, q)
if err != nil {
log.Errorf("failed to fetch quota: %s, error: %v", key, err)
continue
}

if err = Ctl.Update(ctx, q); err != nil {
log.Errorf("failed to refresh quota: %s, error: %v", key, err)
} else {
log.Debugf("successfully refreshed quota: %s", key)
}
}
}

func (c *controller) Count(ctx context.Context, query *q.Query) (int64, error) {
Expand Down Expand Up @@ -163,13 +216,83 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
return quotas, nil
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), retryOpts ...retry.Option) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
// updateUsageByDB updates the quota usage by the database which updates the quota usage immediately.
func (c *controller) updateUsageByDB(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
return retry.Abort(err)
}

hardLimits, err := q.GetHard()
if err != nil {
return retry.Abort(err)
}

used, err := q.GetUsed()
if err != nil {
return retry.Abort(err)
}

newUsed, err := op(hardLimits, used)
if err != nil {
return retry.Abort(err)
}

// The PR https://github.com/goharbor/harbor/pull/17392 optimized the logic for post upload blob which use size 0
// for checking quota, this will increase the pressure of optimistic lock, so here return earlier
// if the quota usage has not changed to reduce the probability of optimistic lock.
if types.Equals(used, newUsed) {
return nil
}

q.SetUsed(newUsed)

err = c.quotaMgr.Update(ctx, q)
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
return retry.Abort(err)
}

return err
}

// updateUsageByRedis updates the quota usage by the redis and flush the quota usage to db asynchronously.
func (c *controller) updateUsageByRedis(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
// earlier abort if context is error such as context canceled
if ctx.Err() != nil {
return retry.Abort(ctx.Err())
}

client, err := libredis.GetCoreClient()
if err != nil {
return retry.Abort(err)
}
// normally use cache.Save will append prefix "cache:", in order to keep consistent
// here adopts raw redis client should also pad the prefix manually.
key := fmt.Sprintf("%s:quota:%s:%s", "cache", reference, referenceID)
return client.Watch(ctx, func(tx *redis.Tx) error {
data, err := tx.Get(ctx, key).Result()
if err != nil && err != redis.Nil {
return retry.Abort(err)
}

q := &quota.Quota{}
// calc the quota usage in real time if no key found
if err == redis.Nil {
// use singleflight to prevent cache penetration and cause pressure on the database.
realQuota, err, _ := c.g.Do(key, func() (interface{}, error) {
return c.calcQuota(ctx, reference, referenceID)
})
if err != nil {
return retry.Abort(err)
}

q = realQuota.(*quota.Quota)
} else {
if err = cache.DefaultCodec().Decode([]byte(data), q); err != nil {
return retry.Abort(err)
}
}

hardLimits, err := q.GetHard()
if err != nil {
return retry.Abort(err)
Expand All @@ -185,21 +308,42 @@ func (c *controller) updateUsageWithRetry(ctx context.Context, reference, refere
return retry.Abort(err)
}

// The PR https://github.com/goharbor/harbor/pull/17392 optimized the logic for post upload blob which use size 0
// for checking quota, this will increase the pressure of optimistic lock, so here return earlier
// if the quota usage has not changed to reduce the probability of optimistic lock.
if types.Equals(used, newUsed) {
return nil
q.SetUsed(newUsed)

val, err := cache.DefaultCodec().Encode(q)
if err != nil {
return retry.Abort(err)
}

q.SetUsed(newUsed)
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
_, err = p.Set(ctx, key, val, quotaExpireTimeout).Result()
return err
})

err = c.quotaMgr.Update(ctx, q)
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
if err != nil && err != redis.TxFailedErr {
return retry.Abort(err)
}

return err
}, key)
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), provider updateQuotaProviderType, retryOpts ...retry.Option) error {
var f func() error
switch provider {
case updateQuotaProviderDB:
f = func() error {
return c.updateUsageByDB(ctx, reference, referenceID, op)
}
case updateQuotaProviderRedis:
f = func() error {
return c.updateUsageByRedis(ctx, reference, referenceID, op)
}
default:
// by default is update quota by db
f = func() error {
return c.updateUsageByDB(ctx, reference, referenceID, op)
}
}

options := []retry.Option{
Expand Down Expand Up @@ -235,23 +379,25 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
return newUsed, err
}

return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), opts.RetryOptions...)
// update quota usage by db for refresh operation
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), updateQuotaProviderType(config.GetQuotaUpdateProvider()), opts.RetryOptions...)
}

func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
if len(resources) == 0 {
return f()
}

if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources)); err != nil {
provider := updateQuotaProviderType(config.GetQuotaUpdateProvider())
if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources), provider); err != nil {
log.G(ctx).Errorf("reserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err)
return err
}

err := f()

if err != nil {
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources)); er != nil {
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources), provider); er != nil {
// ignore this error, the quota usage will be correct when users do operations which will call refresh quota
log.G(ctx).Warningf("rollback resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, er)
}
Expand All @@ -260,22 +406,49 @@ func (c *controller) Request(ctx context.Context, reference, referenceID string,
return err
}

// calcQuota calculates the quota and usage in real time.
func (c *controller) calcQuota(ctx context.Context, reference, referenceID string) (*quota.Quota, error) {
// get quota and usage from db
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
return nil, err
}
// the usage in the db maybe outdated, calc it in real time
driver, err := Driver(ctx, reference)
if err != nil {
return nil, err
}

newUsed, err := driver.CalculateUsage(ctx, referenceID)
if err != nil {
log.G(ctx).Errorf("failed to calculate quota usage for %s %s, error: %v", reference, referenceID, err)
return nil, err
}

q.SetUsed(newUsed)
return q, nil
}

func (c *controller) Update(ctx context.Context, u *quota.Quota) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, u.Reference, u.ReferenceID)
if err != nil {
return err
}

if q.Hard != u.Hard {
if hard, err := u.GetHard(); err == nil {
q.SetHard(hard)
if oldHard, err := q.GetHard(); err == nil {
if newHard, err := u.GetHard(); err == nil {
if !types.Equals(oldHard, newHard) {
q.SetHard(newHard)
}
}
}

if q.Used != u.Used {
if used, err := u.GetUsed(); err == nil {
q.SetUsed(used)
if oldUsed, err := q.GetUsed(); err == nil {
if newUsed, err := u.GetUsed(); err == nil {
if !types.Equals(oldUsed, newUsed) {
q.SetUsed(newUsed)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
golang.org/x/crypto v0.5.0
golang.org/x/net v0.9.0
golang.org/x/oauth2 v0.5.0
golang.org/x/sync v0.3.0
golang.org/x/text v0.9.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
gopkg.in/h2non/gock.v1 v1.0.16
Expand Down Expand Up @@ -162,7 +163,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/sync v0.3.0
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
google.golang.org/api v0.110.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions src/lib/cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func (suite *CacheTestSuite) TestScan() {
}
}
{
// no match should return all keys
// return all keys with test-scan-*
expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"}
// seed data
seed(3)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "")
iter, err := suite.cache.Scan(suite.ctx, "test-scan-*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
Expand All @@ -143,12 +143,12 @@ func (suite *CacheTestSuite) TestScan() {
}

{
// with match should return matched keys
// return matched keys with test-scan-1*
expect := []string{"test-scan-1", "test-scan-10"}
// seed data
seed(11)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "*test-scan-1*")
iter, err := suite.cache.Scan(suite.ctx, "test-scan-1*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/config/metadata/metadatalist.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,6 @@ var (
{Name: common.ExecutionStatusRefreshIntervalSeconds, Scope: SystemScope, Group: BasicGroup, EnvKey: "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS", DefaultValue: "30", ItemType: &Int64Type{}, Editable: false, Description: `The interval seconds to refresh the execution status`},

{Name: common.BannerMessage, Scope: UserScope, Group: BasicGroup, EnvKey: "BANNER_MESSAGE", DefaultValue: "", ItemType: &StringType{}, Editable: true, Description: `The customized banner message for the UI`},
{Name: common.QuotaUpdateProvider, Scope: SystemScope, Group: BasicGroup, EnvKey: "QUOTA_UPDATE_PROVIDER", DefaultValue: "db", ItemType: &StringType{}, Editable: false, Description: `The provider for updating quota, 'db' or 'redis' is supported`},
}
)

0 comments on commit d7fdfc5

Please sign in to comment.