Skip to content

Commit

Permalink
perf: introduce update quota by redis
Browse files Browse the repository at this point in the history
Introduce the quota update provider, improve the performance of pushing
artifacts to same project with high concurrency by implementing
optimistic lock in redis. By default the function is disabled, open it
by set env 'QUOTA_UPDATE_PROVIDER=Redis' for the core container.

Fixes: #18440

Signed-off-by: chlins <chenyuzh@vmware.com>
  • Loading branch information
chlins committed Jun 30, 2023
1 parent d84b1d0 commit 8feaa6b
Show file tree
Hide file tree
Showing 14 changed files with 573 additions and 118 deletions.
2 changes: 2 additions & 0 deletions src/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,6 @@ const (
UIMaxLengthLimitedOfNumber = 10
// ExecutionStatusRefreshIntervalSeconds is the interval seconds for refreshing execution status
ExecutionStatusRefreshIntervalSeconds = "execution_status_refresh_interval_seconds"
// QuotaUpdateProvider is the provider for updating quota, currently support Redis and DB
QuotaUpdateProvider = "quota_update_provider"
)
14 changes: 12 additions & 2 deletions src/controller/blob/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ func (c *controller) Sync(ctx context.Context, references []distribution.Descrip

func (c *controller) SetAcceptedBlobSize(ctx context.Context, sessionID string, size int64) error {
key := blobSizeKey(sessionID)
err := libredis.Instance().Set(ctx, key, size, c.blobSizeExpiration).Err()
rc, err := libredis.GetRegistryClient()
if err != nil {
return err
}

Check warning on line 329 in src/controller/blob/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/blob/controller.go#L328-L329

Added lines #L328 - L329 were not covered by tests

err = rc.Set(ctx, key, size, c.blobSizeExpiration).Err()
if err != nil {
log.Errorf("failed to set accepted blob size for session %s in redis, error: %v", sessionID, err)
return err
Expand All @@ -334,7 +339,12 @@ func (c *controller) SetAcceptedBlobSize(ctx context.Context, sessionID string,

func (c *controller) GetAcceptedBlobSize(ctx context.Context, sessionID string) (int64, error) {
key := blobSizeKey(sessionID)
size, err := libredis.Instance().Get(ctx, key).Int64()
rc, err := libredis.GetRegistryClient()
if err != nil {
return 0, err
}

Check warning on line 345 in src/controller/blob/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/blob/controller.go#L344-L345

Added lines #L344 - L345 were not covered by tests

size, err := rc.Get(ctx, key).Int64()
if err != nil {
if err == redis.Nil {
return 0, nil
Expand Down
215 changes: 194 additions & 21 deletions src/controller/quota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,48 @@ import (
"fmt"
"time"

"github.com/go-redis/redis/v8"
"golang.org/x/sync/singleflight"

// quota driver
_ "github.com/goharbor/harbor/src/controller/quota/driver"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/gtask"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
libredis "github.com/goharbor/harbor/src/lib/redis"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/quota"
"github.com/goharbor/harbor/src/pkg/quota/driver"
"github.com/goharbor/harbor/src/pkg/quota/types"

// init the db config
_ "github.com/goharbor/harbor/src/pkg/config/db"
)

func init() {
// register the async task for flushing quota to db when enable update quota by redis
if provider := config.GetQuotaUpdateProvider(); provider == updateQuotaProviderRedis.String() {
gtask.DefaultPool().AddTask(flushQuota, 30*time.Second)
}

Check warning on line 48 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

type updateQuotaProviderType string

func (t updateQuotaProviderType) String() string {
return string(t)
}

var (
defaultRetryTimeout = time.Minute * 5
// quotaExpireTimeout is the expire time for quota when update quota by redis
quotaExpireTimeout = time.Minute * 5

updateQuotaProviderRedis updateQuotaProviderType = "Redis"
updateQuotaProviderDB updateQuotaProviderType = "DB"
)

var (
Expand Down Expand Up @@ -87,6 +115,31 @@ type controller struct {
reservedExpiration time.Duration

quotaMgr quota.Manager
g singleflight.Group
}

// flushQuota flushes the quota info from redis to db asynchronously.
func flushQuota(ctx context.Context) {
iter, err := cache.Default().Scan(ctx, "quota:*")
if err != nil {
log.Errorf("failed to scan out the quota records from redis")
}

Check warning on line 126 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L122-L126

Added lines #L122 - L126 were not covered by tests

for iter.Next(ctx) {
key := iter.Val()
q := &quota.Quota{}
err = cache.Default().Fetch(ctx, key, q)
if err != nil {
log.Errorf("failed to fetch quota: %s, error: %v", key, err)
continue

Check warning on line 134 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L128-L134

Added lines #L128 - L134 were not covered by tests
}

if err = Ctl.Update(ctx, q); err != nil {
log.Errorf("failed to refresh quota: %s, error: %v", key, err)
} else {
log.Debugf("successfully refreshed quota: %s", key)
}

Check warning on line 141 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L137-L141

Added lines #L137 - L141 were not covered by tests
}
}

func (c *controller) Count(ctx context.Context, query *q.Query) (int64, error) {
Expand Down Expand Up @@ -163,13 +216,83 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
return quotas, nil
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), retryOpts ...retry.Option) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
// updateUsageByDB updates the quota usage by the database which updates the quota usage immediately.
func (c *controller) updateUsageByDB(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
return retry.Abort(err)
}

Check warning on line 224 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L223-L224

Added lines #L223 - L224 were not covered by tests

hardLimits, err := q.GetHard()
if err != nil {
return retry.Abort(err)
}

Check warning on line 229 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L228-L229

Added lines #L228 - L229 were not covered by tests

used, err := q.GetUsed()
if err != nil {
return retry.Abort(err)
}

Check warning on line 234 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L233-L234

Added lines #L233 - L234 were not covered by tests

newUsed, err := op(hardLimits, used)
if err != nil {
return retry.Abort(err)
}

// The PR https://github.com/goharbor/harbor/pull/17392 optimized the logic for post upload blob which use size 0
// for checking quota, this will increase the pressure of optimistic lock, so here return earlier
// if the quota usage has not changed to reduce the probability of optimistic lock.
if types.Equals(used, newUsed) {
return nil
}

q.SetUsed(newUsed)

err = c.quotaMgr.Update(ctx, q)
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
return retry.Abort(err)
}

Check warning on line 253 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L252-L253

Added lines #L252 - L253 were not covered by tests

return err
}

// updateUsageByRedis updates the quota usage by the redis and flush the quota usage to db asynchronously.
func (c *controller) updateUsageByRedis(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
// earlier abort if context is error such as context canceled
if ctx.Err() != nil {
return retry.Abort(ctx.Err())
}

Check warning on line 263 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L259-L263

Added lines #L259 - L263 were not covered by tests

client, err := libredis.GetCoreClient()
if err != nil {
return retry.Abort(err)
}

Check warning on line 268 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L265-L268

Added lines #L265 - L268 were not covered by tests
// normally use cache.Save will append prefix "cache:", in order to keep consistent
// here adopts raw redis client should also pad the prefix manually.
key := fmt.Sprintf("%s:quota:%s:%s", "cache", reference, referenceID)
return client.Watch(ctx, func(tx *redis.Tx) error {
data, err := tx.Get(ctx, key).Result()
if err != nil && err != redis.Nil {

Check warning on line 274 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L271-L274

Added lines #L271 - L274 were not covered by tests
return retry.Abort(err)
}

q := &quota.Quota{}
// calc the quota usage in real time if no key found
if err == redis.Nil {
// use singleflight to prevent cache penetration and cause pressure on the database.
realQuota, err, _ := c.g.Do(key, func() (interface{}, error) {
return c.calcQuota(ctx, reference, referenceID)
})
if err != nil {
return retry.Abort(err)
}

Check warning on line 287 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L278-L287

Added lines #L278 - L287 were not covered by tests

q = realQuota.(*quota.Quota)
} else {
if err = cache.DefaultCodec().Decode([]byte(data), q); err != nil {
return retry.Abort(err)
}

Check warning on line 293 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L289-L293

Added lines #L289 - L293 were not covered by tests
}

hardLimits, err := q.GetHard()
if err != nil {
return retry.Abort(err)
Expand All @@ -185,21 +308,42 @@ func (c *controller) updateUsageWithRetry(ctx context.Context, reference, refere
return retry.Abort(err)
}

// The PR https://github.com/goharbor/harbor/pull/17392 optimized the logic for post upload blob which use size 0
// for checking quota, this will increase the pressure of optimistic lock, so here return earlier
// if the quota usage has not changed to reduce the probability of optimistic lock.
if types.Equals(used, newUsed) {
return nil
q.SetUsed(newUsed)

val, err := cache.DefaultCodec().Encode(q)
if err != nil {
return retry.Abort(err)

Check warning on line 315 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L311-L315

Added lines #L311 - L315 were not covered by tests
}

q.SetUsed(newUsed)
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
_, err = p.Set(ctx, key, val, quotaExpireTimeout).Result()
return err
})

Check warning on line 321 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L318-L321

Added lines #L318 - L321 were not covered by tests

err = c.quotaMgr.Update(ctx, q)
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
if err != nil && err != redis.TxFailedErr {

Check warning on line 323 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L323

Added line #L323 was not covered by tests
return retry.Abort(err)
}

return err
}, key)
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), provider updateQuotaProviderType, retryOpts ...retry.Option) error {
var f func() error
switch provider {
case updateQuotaProviderDB:
f = func() error {
return c.updateUsageByDB(ctx, reference, referenceID, op)
}
case updateQuotaProviderRedis:
f = func() error {
return c.updateUsageByRedis(ctx, reference, referenceID, op)
}
default:
// by default is update quota by db
f = func() error {
return c.updateUsageByDB(ctx, reference, referenceID, op)
}

Check warning on line 346 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L338-L346

Added lines #L338 - L346 were not covered by tests
}

options := []retry.Option{
Expand Down Expand Up @@ -235,23 +379,25 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
return newUsed, err
}

return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), opts.RetryOptions...)
// update quota usage by db for refresh operation
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), updateQuotaProviderType(config.GetQuotaUpdateProvider()), opts.RetryOptions...)
}

func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
if len(resources) == 0 {
return f()
}

if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources)); err != nil {
provider := updateQuotaProviderType(config.GetQuotaUpdateProvider())
if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources), provider); err != nil {
log.G(ctx).Errorf("reserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err)
return err
}

err := f()

if err != nil {
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources)); er != nil {
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources), provider); er != nil {
// ignore this error, the quota usage will be correct when users do operations which will call refresh quota
log.G(ctx).Warningf("rollback resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, er)
}
Expand All @@ -260,22 +406,49 @@ func (c *controller) Request(ctx context.Context, reference, referenceID string,
return err
}

// calcQuota calculates the quota and usage in real time.
func (c *controller) calcQuota(ctx context.Context, reference, referenceID string) (*quota.Quota, error) {
// get quota and usage from db
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
return nil, err
}

Check warning on line 415 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L410-L415

Added lines #L410 - L415 were not covered by tests
// the usage in the db maybe outdated, calc it in real time
driver, err := Driver(ctx, reference)
if err != nil {
return nil, err
}

Check warning on line 420 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L417-L420

Added lines #L417 - L420 were not covered by tests

newUsed, err := driver.CalculateUsage(ctx, referenceID)
if err != nil {
log.G(ctx).Errorf("failed to calculate quota usage for %s %s, error: %v", reference, referenceID, err)
return nil, err
}

Check warning on line 426 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L422-L426

Added lines #L422 - L426 were not covered by tests

q.SetUsed(newUsed)
return q, nil

Check warning on line 429 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L428-L429

Added lines #L428 - L429 were not covered by tests
}

func (c *controller) Update(ctx context.Context, u *quota.Quota) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, u.Reference, u.ReferenceID)
if err != nil {
return err
}

if q.Hard != u.Hard {
if hard, err := u.GetHard(); err == nil {
q.SetHard(hard)
if oldHard, err := q.GetHard(); err == nil {
if newHard, err := u.GetHard(); err == nil {
if !types.Equals(oldHard, newHard) {
q.SetHard(newHard)
}

Check warning on line 443 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L439-L443

Added lines #L439 - L443 were not covered by tests
}
}

if q.Used != u.Used {
if used, err := u.GetUsed(); err == nil {
q.SetUsed(used)
if oldUsed, err := q.GetUsed(); err == nil {
if newUsed, err := u.GetUsed(); err == nil {
if !types.Equals(oldUsed, newUsed) {
q.SetUsed(newUsed)
}

Check warning on line 451 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L447-L451

Added lines #L447 - L451 were not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
golang.org/x/crypto v0.5.0
golang.org/x/net v0.9.0
golang.org/x/oauth2 v0.5.0
golang.org/x/sync v0.3.0
golang.org/x/text v0.9.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
gopkg.in/h2non/gock.v1 v1.0.16
Expand Down Expand Up @@ -162,7 +163,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/sync v0.3.0
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
google.golang.org/api v0.110.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions src/lib/cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func (suite *CacheTestSuite) TestScan() {
}
}
{
// no match should return all keys
// return all keys with test-scan-*
expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"}
// seed data
seed(3)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "")
iter, err := suite.cache.Scan(suite.ctx, "test-scan-*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
Expand All @@ -143,12 +143,12 @@ func (suite *CacheTestSuite) TestScan() {
}

{
// with match should return matched keys
// return matched keys with test-scan-1*
expect := []string{"test-scan-1", "test-scan-10"}
// seed data
seed(11)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "*test-scan-1*")
iter, err := suite.cache.Scan(suite.ctx, "test-scan-1*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/config/metadata/metadatalist.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,6 @@ var (
{Name: common.SessionTimeout, Scope: UserScope, Group: BasicGroup, EnvKey: "SESSION_TIMEOUT", DefaultValue: "60", ItemType: &Int64Type{}, Editable: true, Description: `The session timeout in minutes`},

{Name: common.ExecutionStatusRefreshIntervalSeconds, Scope: SystemScope, Group: BasicGroup, EnvKey: "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS", DefaultValue: "30", ItemType: &Int64Type{}, Editable: false, Description: `The interval seconds to refresh the execution status`},
{Name: common.QuotaUpdateProvider, Scope: SystemScope, Group: BasicGroup, EnvKey: "QUOTA_UPDATE_PROVIDER", DefaultValue: "DB", ItemType: &StringType{}, Editable: false, Description: `The provider for updating quota, 'DB' or 'Redis' is supported`},
}
)

0 comments on commit 8feaa6b

Please sign in to comment.