Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Commit

Permalink
better sync and disconnected behavior for quota
Browse files Browse the repository at this point in the history
Change-Id: Ic516685d2bcca397232ea662abb8af3508a86513
  • Loading branch information
theganyo committed Apr 25, 2019
1 parent acc5936 commit 36e4477
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 187 deletions.
125 changes: 73 additions & 52 deletions adapter/quota/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,44 @@ import (
type bucket struct {
manager *Manager
quotaURL string
prototype *Request
requests []*Request
request *Request // accumulated for sync
result *Result
created time.Time
lock sync.RWMutex
now func() time.Time
synced time.Time // last sync
checked time.Time // last apply
refreshAfter time.Duration // after synced
deleteAfter time.Duration // after checked
synced time.Time // last sync time
checked time.Time // last apply time
refreshAfter time.Duration // duration after synced
deleteAfter time.Duration // duration after checked
invalidAfter time.Time // result window is no longer valid after this
syncError error
}

func newBucket(req *Request, m *Manager, auth *auth.Context) *bucket {
func newBucket(req Request, m *Manager, auth *auth.Context) *bucket {
org := auth.Context.Organization()
env := auth.Context.Environment()
quotaURL := *m.baseURL
quotaURL.Path = path.Join(quotaURL.Path, fmt.Sprintf(quotaPath, org, env))
return &bucket{
prototype: req,
request: &req,
manager: m,
quotaURL: quotaURL.String(),
requests: nil,
result: nil,
created: m.now(),
checked: m.now(),
lock: sync.RWMutex{},
now: m.now,
deleteAfter: defaultDeleteAfter,
refreshAfter: defaultRefreshAfter,
}
}

func (b *bucket) now() time.Time {
return b.manager.now()
}

// apply a quota request to the local quota bucket and schedule for sync
func (b *bucket) apply(m *Manager, req *Request) (*Result, error) {
if !b.isCompatible(req) {
func (b *bucket) apply(req *Request) (*Result, error) {

if !b.compatible(req) {
return nil, fmt.Errorf("incompatible quota buckets")
}

Expand All @@ -79,63 +83,67 @@ func (b *bucket) apply(m *Manager, req *Request) (*Result, error) {
res.Used = b.result.Used // start from last result
res.Used += b.result.Exceeded
}
var dupRequest bool
for _, r := range b.requests {
res.Used += r.Weight
if req.DeduplicationID != "" && r.DeduplicationID == req.DeduplicationID {
dupRequest = true
}
}
if !dupRequest {
res.Used += req.Weight
b.requests = append(b.requests, req)
}

b.request.Weight += req.Weight
res.Used += b.request.Weight

if res.Used > res.Allowed {
res.Exceeded = res.Used - res.Allowed
res.Used = res.Allowed
}
return res, nil

return res, b.syncError
}

func (b *bucket) isCompatible(r *Request) bool {
return b.prototype.Interval == r.Interval &&
b.prototype.Allow == r.Allow &&
b.prototype.TimeUnit == r.TimeUnit &&
b.prototype.Identifier == r.Identifier
func (b *bucket) compatible(r *Request) bool {
return b.request.Interval == r.Interval &&
b.request.Allow == r.Allow &&
b.request.TimeUnit == r.TimeUnit &&
b.request.Identifier == r.Identifier
}

// sync local quota bucket with server
func (b *bucket) sync(m *Manager) {
b.lock.Lock()
requests := b.requests
b.requests = nil
b.lock.Unlock()
// single-threaded call - managed by manager
func (b *bucket) sync() error {

log := b.manager.log
log.Debugf("syncing quota %s", b.request.Identifier)

var weight int64
for _, r := range requests {
weight += r.Weight
revert := func(err error) error {
err = log.Errorf("unable to sync quota %s: %v", b.request.Identifier, err)
b.lock.Lock()
b.syncError = err
b.lock.Unlock()
return err
}

r := *b.prototype // make copy
r.Weight = weight
b.lock.Lock()
r := *b.request // make copy

if b.windowExpired() {
r.Weight = 0 // if expired, don't send Weight
}
b.lock.Unlock()

body := new(bytes.Buffer)
json.NewEncoder(body).Encode(r)
err := json.NewEncoder(body).Encode(r)
if err != nil {
return revert(fmt.Errorf("encode: %v", err))
}

req, err := http.NewRequest(http.MethodPost, b.quotaURL, body)
if err != nil {
m.log.Errorf("unable to create quota sync request: %v", err)
return revert(fmt.Errorf("new request: %v", err))
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")

m.log.Debugf("Sending to %s: %s", b.quotaURL, body)
log.Debugf("sending quota: %s", body)

resp, err := m.client.Do(req)
resp, err := b.manager.client.Do(req)
if err != nil {
m.log.Errorf("unable to sync quota: %v", err)
return
return revert(fmt.Errorf("do request: %v", err))
}
defer resp.Body.Close()

Expand All @@ -147,29 +155,42 @@ func (b *bucket) sync(m *Manager) {
case 200:
var quotaResult Result
if err = json.Unmarshal(respBody, &quotaResult); err != nil {
m.log.Errorf("Error unmarshalling: %s", string(respBody))
return
return revert(fmt.Errorf("bad response: %s", string(respBody)))
}

m.log.Debugf("quota result: %#v", quotaResult)
log.Debugf("quota synced: %#v", quotaResult)
b.lock.Lock()
b.synced = b.now()
if b.result != nil && b.result.ExpiryTime != quotaResult.ExpiryTime {
b.request.Weight = 0
} else {
b.request.Weight -= r.Weight // same window, keep accumulated Weight
}
b.result = &quotaResult
b.syncError = nil
b.lock.Unlock()
return nil

default:
m.log.Errorf("quota sync failed. result: %s", string(respBody))
return revert(fmt.Errorf("bad response (%d): %s", resp.StatusCode, string(respBody)))
}
}

func (b *bucket) needToDelete() bool {
b.lock.RLock()
defer b.lock.RUnlock()
return b.requests == nil && b.now().After(b.checked.Add(b.deleteAfter))
return b.request.Weight == 0 && b.now().After(b.checked.Add(b.deleteAfter))
}

func (b *bucket) needToSync() bool {
b.lock.RLock()
defer b.lock.RUnlock()
return b.requests != nil || b.now().After(b.synced.Add(b.refreshAfter))
return b.request.Weight > 0 || b.now().After(b.synced.Add(b.refreshAfter))
}

func (b *bucket) windowExpired() bool {
if b.result != nil {
return b.now().After(time.Unix(b.result.ExpiryTime, 0))
}
return false
}
Loading

0 comments on commit 36e4477

Please sign in to comment.