From 36e44771cd5cecb6b167a7fb936f8e5fa2e09070 Mon Sep 17 00:00:00 2001 From: Scott Ganyo Date: Wed, 24 Apr 2019 16:33:28 -0700 Subject: [PATCH] better sync and disconnected behavior for quota Change-Id: Ic516685d2bcca397232ea662abb8af3508a86513 --- adapter/quota/bucket.go | 125 ++++++----- adapter/quota/bucket_test.go | 92 +++++---- adapter/quota/manager.go | 67 ++++-- adapter/quota/manager_test.go | 321 +++++++++++++++++++++++------ adapter/quota/result_cache.go | 46 +++++ adapter/quota/result_cache_test.go | 35 ++++ adapter/quota/structs.go | 17 +- 7 files changed, 516 insertions(+), 187 deletions(-) create mode 100644 adapter/quota/result_cache.go create mode 100644 adapter/quota/result_cache_test.go diff --git a/adapter/quota/bucket.go b/adapter/quota/bucket.go index 9013bce..6e278da 100644 --- a/adapter/quota/bucket.go +++ b/adapter/quota/bucket.go @@ -30,40 +30,44 @@ import ( type bucket struct { manager *Manager quotaURL string - prototype *Request - requests []*Request + request *Request // accumulated for sync result *Result created time.Time lock sync.RWMutex - now func() time.Time - synced time.Time // last sync - checked time.Time // last apply - refreshAfter time.Duration // after synced - deleteAfter time.Duration // after checked + synced time.Time // last sync time + checked time.Time // last apply time + refreshAfter time.Duration // duration after synced + deleteAfter time.Duration // duration after checked + invalidAfter time.Time // result window is no longer valid after this + syncError error } -func newBucket(req *Request, m *Manager, auth *auth.Context) *bucket { +func newBucket(req Request, m *Manager, auth *auth.Context) *bucket { org := auth.Context.Organization() env := auth.Context.Environment() quotaURL := *m.baseURL quotaURL.Path = path.Join(quotaURL.Path, fmt.Sprintf(quotaPath, org, env)) return &bucket{ - prototype: req, + request: &req, manager: m, quotaURL: quotaURL.String(), - requests: nil, result: nil, created: m.now(), + checked: m.now(), lock: sync.RWMutex{}, - now: m.now, deleteAfter: defaultDeleteAfter, refreshAfter: defaultRefreshAfter, } } +func (b *bucket) now() time.Time { + return b.manager.now() +} + // apply a quota request to the local quota bucket and schedule for sync -func (b *bucket) apply(m *Manager, req *Request) (*Result, error) { - if !b.isCompatible(req) { +func (b *bucket) apply(req *Request) (*Result, error) { + + if !b.compatible(req) { return nil, fmt.Errorf("incompatible quota buckets") } @@ -79,63 +83,67 @@ func (b *bucket) apply(m *Manager, req *Request) (*Result, error) { res.Used = b.result.Used // start from last result res.Used += b.result.Exceeded } - var dupRequest bool - for _, r := range b.requests { - res.Used += r.Weight - if req.DeduplicationID != "" && r.DeduplicationID == req.DeduplicationID { - dupRequest = true - } - } - if !dupRequest { - res.Used += req.Weight - b.requests = append(b.requests, req) - } + + b.request.Weight += req.Weight + res.Used += b.request.Weight + if res.Used > res.Allowed { res.Exceeded = res.Used - res.Allowed res.Used = res.Allowed } - return res, nil + + return res, b.syncError } -func (b *bucket) isCompatible(r *Request) bool { - return b.prototype.Interval == r.Interval && - b.prototype.Allow == r.Allow && - b.prototype.TimeUnit == r.TimeUnit && - b.prototype.Identifier == r.Identifier +func (b *bucket) compatible(r *Request) bool { + return b.request.Interval == r.Interval && + b.request.Allow == r.Allow && + b.request.TimeUnit == r.TimeUnit && + b.request.Identifier == r.Identifier } // sync local quota bucket with server -func (b *bucket) sync(m *Manager) { - b.lock.Lock() - requests := b.requests - b.requests = nil - b.lock.Unlock() +// single-threaded call - managed by manager +func (b *bucket) sync() error { + + log := b.manager.log + log.Debugf("syncing quota %s", b.request.Identifier) - var weight int64 - for _, r := range requests { - weight += r.Weight + revert := func(err error) error { + err = log.Errorf("unable to sync quota %s: %v", b.request.Identifier, err) + b.lock.Lock() + b.syncError = err + b.lock.Unlock() + return err } - r := *b.prototype // make copy - r.Weight = weight + b.lock.Lock() + r := *b.request // make copy + + if b.windowExpired() { + r.Weight = 0 // if expired, don't send Weight + } + b.lock.Unlock() body := new(bytes.Buffer) - json.NewEncoder(body).Encode(r) + err := json.NewEncoder(body).Encode(r) + if err != nil { + return revert(fmt.Errorf("encode: %v", err)) + } req, err := http.NewRequest(http.MethodPost, b.quotaURL, body) if err != nil { - m.log.Errorf("unable to create quota sync request: %v", err) + return revert(fmt.Errorf("new request: %v", err)) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") - m.log.Debugf("Sending to %s: %s", b.quotaURL, body) + log.Debugf("sending quota: %s", body) - resp, err := m.client.Do(req) + resp, err := b.manager.client.Do(req) if err != nil { - m.log.Errorf("unable to sync quota: %v", err) - return + return revert(fmt.Errorf("do request: %v", err)) } defer resp.Body.Close() @@ -147,29 +155,42 @@ func (b *bucket) sync(m *Manager) { case 200: var quotaResult Result if err = json.Unmarshal(respBody, "aResult); err != nil { - m.log.Errorf("Error unmarshalling: %s", string(respBody)) - return + return revert(fmt.Errorf("bad response: %s", string(respBody))) } - m.log.Debugf("quota result: %#v", quotaResult) + log.Debugf("quota synced: %#v", quotaResult) b.lock.Lock() b.synced = b.now() + if b.result != nil && b.result.ExpiryTime != quotaResult.ExpiryTime { + b.request.Weight = 0 + } else { + b.request.Weight -= r.Weight // same window, keep accumulated Weight + } b.result = "aResult + b.syncError = nil b.lock.Unlock() + return nil default: - m.log.Errorf("quota sync failed. result: %s", string(respBody)) + return revert(fmt.Errorf("bad response (%d): %s", resp.StatusCode, string(respBody))) } } func (b *bucket) needToDelete() bool { b.lock.RLock() defer b.lock.RUnlock() - return b.requests == nil && b.now().After(b.checked.Add(b.deleteAfter)) + return b.request.Weight == 0 && b.now().After(b.checked.Add(b.deleteAfter)) } func (b *bucket) needToSync() bool { b.lock.RLock() defer b.lock.RUnlock() - return b.requests != nil || b.now().After(b.synced.Add(b.refreshAfter)) + return b.request.Weight > 0 || b.now().After(b.synced.Add(b.refreshAfter)) +} + +func (b *bucket) windowExpired() bool { + if b.result != nil { + return b.now().After(time.Unix(b.result.ExpiryTime, 0)) + } + return false } diff --git a/adapter/quota/bucket_test.go b/adapter/quota/bucket_test.go index 113c389..b16ea4b 100644 --- a/adapter/quota/bucket_test.go +++ b/adapter/quota/bucket_test.go @@ -15,8 +15,6 @@ package quota import ( - "net/http" - "net/url" "reflect" "sync" "testing" @@ -25,15 +23,18 @@ import ( func TestBucket(t *testing.T) { now := func() time.Time { return time.Unix(1521221450, 0) } + m := &Manager{now: now} cases := map[string]struct { - priorRequests []*Request - priorResult *Result - request *Request - want *Result + priorRequest *Request + priorResult *Result + request *Request + want *Result }{ "First request": { - nil, + &Request{ + Allow: 3, + }, nil, &Request{ Allow: 3, @@ -48,8 +49,9 @@ func TestBucket(t *testing.T) { }, }, "Valid request": { - []*Request{ - {Weight: 1}, + &Request{ + Allow: 4, + Weight: 1, }, &Result{Used: 2}, &Request{ @@ -65,9 +67,9 @@ func TestBucket(t *testing.T) { }, }, "Newly exceeded": { - []*Request{ - {Weight: 1}, - {Weight: 2}, + &Request{ + Allow: 7, + Weight: 3, }, &Result{Used: 3}, &Request{ @@ -83,7 +85,9 @@ func TestBucket(t *testing.T) { }, }, "Previously exceeded": { - []*Request{}, + &Request{ + Allow: 3, + }, &Result{ Used: 3, Exceeded: 1, @@ -102,23 +106,19 @@ func TestBucket(t *testing.T) { }, } - m := newManager(&url.URL{}, http.DefaultClient) - for id, c := range cases { t.Logf("** Executing test case '%s' **", id) b := &bucket{ - prototype: c.request, - quotaURL: "", - requests: c.priorRequests, + manager: m, + request: c.priorRequest, result: c.priorResult, created: now(), lock: sync.RWMutex{}, - now: now, deleteAfter: defaultDeleteAfter, } - res, err := b.apply(m, c.request) + res, err := b.apply(c.request) if err != nil { t.Errorf("should not get error: %v", err) } @@ -131,36 +131,40 @@ func TestBucket(t *testing.T) { func TestNeedToDelete(t *testing.T) { now := func() time.Time { return time.Unix(1521221450, 0) } + m := &Manager{now: now} cases := map[string]struct { - requests []*Request - checked time.Time - want bool + request *Request + checked time.Time + want bool }{ "empty": { - want: true, + request: &Request{}, + want: true, }, "recently checked": { + request: &Request{}, checked: now(), want: false, }, "not recently checked": { + request: &Request{}, checked: now().Add(-time.Hour), want: true, }, "has pending requests": { - requests: []*Request{}, - checked: now().Add(-time.Hour), - want: false, + request: &Request{Weight: 1}, + checked: now().Add(-time.Hour), + want: false, }, } for id, c := range cases { t.Logf("** Executing test case '%s' **", id) b := bucket{ - now: now, + manager: m, deleteAfter: time.Minute, - requests: c.requests, + request: c.request, checked: c.checked, } if c.want != b.needToDelete() { @@ -171,36 +175,40 @@ func TestNeedToDelete(t *testing.T) { func TestNeedToSync(t *testing.T) { now := func() time.Time { return time.Unix(1521221450, 0) } + m := &Manager{now: now} cases := map[string]struct { - requests []*Request - synced time.Time - want bool + request *Request + synced time.Time + want bool }{ "empty": { - want: true, + request: &Request{}, + want: true, }, "recently synced": { - synced: now(), - want: false, + request: &Request{}, + synced: now(), + want: false, }, "not recently synced": { - synced: now().Add(-time.Hour), - want: true, + request: &Request{}, + synced: now().Add(-time.Hour), + want: true, }, "has pending requests": { - synced: now(), - requests: []*Request{}, - want: true, + request: &Request{Weight: 1}, + synced: now(), + want: true, }, } for id, c := range cases { t.Logf("** Executing test case '%s' **", id) b := bucket{ - now: now, + manager: m, refreshAfter: time.Minute, - requests: c.requests, + request: c.request, synced: c.synced, } if c.want != b.needToSync() { diff --git a/adapter/quota/manager.go b/adapter/quota/manager.go index 5f94025..dbeab89 100644 --- a/adapter/quota/manager.go +++ b/adapter/quota/manager.go @@ -33,6 +33,7 @@ const ( defaultRefreshAfter = 1 * time.Minute defaultDeleteAfter = 10 * time.Minute syncQueueSize = 100 + resultCacheBufferSize = 30 ) // A Manager tracks multiple Apigee quotas @@ -48,6 +49,8 @@ type Manager struct { buckets map[string]*bucket // Map from ID -> bucket syncQueue chan *bucket numSyncWorkers int + dupCache ResultCache + syncingBuckets map[*bucket]struct{} } // NewManager constructs and starts a new Manager. Call Close when done. @@ -72,6 +75,8 @@ func newManager(baseURL *url.URL, client *http.Client) *Manager { syncQueue: make(chan *bucket, syncQueueSize), baseURL: baseURL, numSyncWorkers: defaultNumSyncWorkers, + dupCache: ResultCache{size: resultCacheBufferSize}, + syncingBuckets: map[*bucket]struct{}{}, } } @@ -108,36 +113,54 @@ func (m *Manager) Close() { // Apply a quota request to the local quota bucket and schedule for sync func (m *Manager) Apply(auth *auth.Context, p *product.APIProduct, args adapter.QuotaArgs) (*Result, error) { + + if result := m.dupCache.Get(args.DeduplicationID); result != nil { + return result, nil + } + quotaID := fmt.Sprintf("%s-%s", auth.Application, p.Name) req := &Request{ - Identifier: quotaID, - Weight: args.QuotaAmount, - Interval: p.QuotaIntervalInt, - Allow: p.QuotaLimitInt, - TimeUnit: p.QuotaTimeUnit, - DeduplicationID: args.DeduplicationID, + Identifier: quotaID, + Weight: args.QuotaAmount, + Interval: p.QuotaIntervalInt, + Allow: p.QuotaLimitInt, + TimeUnit: p.QuotaTimeUnit, } + // a new bucket is created if missing or if product is no longer compatible + var result *Result + var err error + forceSync := false m.bucketsLock.RLock() - b, existingBucket := m.buckets[quotaID] - if !existingBucket || !b.isCompatible(req) { - m.bucketsLock.RUnlock() + b, ok := m.buckets[quotaID] + m.bucketsLock.RUnlock() + if !ok || !b.compatible(req) { m.bucketsLock.Lock() - b, existingBucket = m.buckets[quotaID] - if !existingBucket || !b.isCompatible(req) { - b = newBucket(req, m, auth) + b, ok = m.buckets[quotaID] + if !ok || !b.compatible(req) { + forceSync = true + b = newBucket(*req, m, auth) + m.syncingBuckets[b] = struct{}{} + defer delete(m.syncingBuckets, b) m.buckets[quotaID] = b + m.log.Debugf("new quota bucket: %s", quotaID) } m.bucketsLock.Unlock() - m.bucketsLock.RLock() } - m.bucketsLock.RUnlock() - if !existingBucket { - b.sync(m) // force sync for new bucket + + if forceSync { + err = b.sync() // force sync for new bucket + result = b.result + } else { + result, err = b.apply(req) + } + + if result != nil && err == nil && args.DeduplicationID != "" { + m.dupCache.Add(args.DeduplicationID, result) } - return b.apply(m, req) + return result, err } // loop to sync active buckets and deletes old buckets @@ -177,9 +200,13 @@ func (m *Manager) syncLoop() { // worker routine for syncing a bucket with the server func (m *Manager) syncBucketWorker() { for { - bucket, more := <-m.syncQueue - if more { - bucket.sync(m) + bucket, ok := <-m.syncQueue + if ok { + if _, ok := m.syncingBuckets[bucket]; !ok { + m.syncingBuckets[bucket] = struct{}{} + bucket.sync() + delete(m.syncingBuckets, bucket) + } } else { m.log.Debugf("closing quota sync worker") m.closed <- true diff --git a/adapter/quota/manager_test.go b/adapter/quota/manager_test.go index 61aacb8..8cec044 100644 --- a/adapter/quota/manager_test.go +++ b/adapter/quota/manager_test.go @@ -16,6 +16,7 @@ package quota import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "reflect" @@ -32,20 +33,16 @@ import ( func TestQuota(t *testing.T) { type testcase struct { - test string - deduplicationID string - want Result + name string + dedupID string + want Result } var m *Manager m.Close() // just to verify it doesn't die here - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - result := Result{} - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(result) - })) - defer ts.Close() + serverResult := Result{} + ts := testServer(&serverResult, time.Now, nil) env := test.NewEnv(t) context := authtest.NewContext(ts.URL, env) @@ -76,24 +73,24 @@ func TestQuota(t *testing.T) { cases := []testcase{ { - test: "first", - deduplicationID: "X", + name: "first", + dedupID: "X", want: Result{ Used: 1, Exceeded: 0, }, }, { - test: "duplicate", - deduplicationID: "X", + name: "duplicate", + dedupID: "X", want: Result{ Used: 1, Exceeded: 0, }, }, { - test: "second", - deduplicationID: "Y", + name: "second", + dedupID: "Y", want: Result{ Used: 1, Exceeded: 1, @@ -102,9 +99,9 @@ func TestQuota(t *testing.T) { } for _, c := range cases { - t.Logf("** Executing test case '%s' **", c.test) + t.Logf("** Executing test case '%s' **", c.name) - args.DeduplicationID = c.deduplicationID + args.DeduplicationID = c.dedupID result, err := m.Apply(authContext, p, args) if err != nil { t.Fatalf("should not get error: %v", err) @@ -117,54 +114,42 @@ func TestQuota(t *testing.T) { } } - // test incompatible bucket - t.Log("** Executing incompatible test case") + // test incompatible product p2 := &product.APIProduct{ QuotaLimitInt: 1, QuotaIntervalInt: 2, QuotaTimeUnit: "second", } c := testcase{ - test: "second", - deduplicationID: "Y", + name: "incompatible", + dedupID: "Z", want: Result{ Used: 1, - Exceeded: 0, + Exceeded: 1, }, } + t.Logf("** Executing test case '%s' **", c.name) + args.DeduplicationID = c.dedupID result, err := m.Apply(authContext, p2, args) if err != nil { t.Fatalf("should not get error: %v", err) } if result.Used != c.want.Used { - t.Errorf("used got: %v, want: %v", result.Used, 0) + t.Errorf("used got: %v, want: %v", result.Used, c.want.Used) } if result.Exceeded != c.want.Exceeded { t.Errorf("exceeded got: %v, want: %v", result.Exceeded, c.want.Exceeded) } - } // not fully determinate, uses delays and background threads func TestSync(t *testing.T) { - now := func() time.Time { return time.Unix(1521221450, 0) } + fakeTime := int64(1521221450) + now := func() time.Time { return time.Unix(fakeTime, 0) } serverResult := Result{} - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - req := Request{} - json.NewDecoder(r.Body).Decode(&req) - serverResult.Allowed = req.Allow - serverResult.Used += req.Weight - if serverResult.Used > serverResult.Allowed { - serverResult.Exceeded = serverResult.Used - serverResult.Allowed - serverResult.Used = serverResult.Allowed - } - serverResult.Timestamp = now().Unix() - serverResult.ExpiryTime = now().Unix() - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(serverResult) - })) + ts := testServer(&serverResult, now, nil) defer ts.Close() env := test.NewEnv(t) @@ -180,14 +165,12 @@ func TestSync(t *testing.T) { } quotaID := "id" - requests := []*Request{ - { - Identifier: quotaID, - Weight: 2, - }, - { - Weight: 1, - }, + request := &Request{ + Identifier: quotaID, + Interval: 1, + TimeUnit: "seconds", + Allow: 1, + Weight: 3, } result := &Result{ Used: 1, @@ -202,26 +185,24 @@ func TestSync(t *testing.T) { syncQueue: make(chan *bucket, 10), baseURL: context.ApigeeBase(), numSyncWorkers: 1, + syncingBuckets: map[*bucket]struct{}{}, } - m.Start(env) - defer m.Close() - b := newBucket(requests[0], m, authContext) - b.lock.Lock() - b.created = now() - b.now = now - b.requests = requests + b := newBucket(*request, m, authContext) + b.checked = now() b.result = result - m.bucketsLock.Lock() m.buckets = map[string]*bucket{quotaID: b} - m.bucketsLock.Unlock() b.refreshAfter = time.Millisecond - b.lock.Unlock() - time.Sleep(15 * time.Millisecond) // allow idle sync + m.Start(env) + defer m.Close() + + fakeTime = fakeTime + 10 + time.Sleep(10 * time.Millisecond) // allow idle sync + b.lock.RLock() - if len(b.requests) != 0 { - t.Errorf("pending requests got: %d, want: %d", len(b.requests), 0) + if b.request.Weight != 0 { + t.Errorf("pending request weight got: %d, want: %d", b.request.Weight, 0) } if !reflect.DeepEqual(*b.result, serverResult) { t.Errorf("result got: %#v, want: %#v", *b.result, serverResult) @@ -229,19 +210,29 @@ func TestSync(t *testing.T) { if b.synced != m.now() { t.Errorf("synced got: %#v, want: %#v", b.synced, m.now()) } + if m.buckets[quotaID] == nil { + t.Errorf("old bucket should not have been deleted") + } b.lock.RUnlock() // do interactive sync req := &Request{ - Allow: 3, - Weight: 2, + Identifier: quotaID, + Interval: 1, + TimeUnit: "seconds", + Allow: 1, + Weight: 2, } - b.apply(m, req) - b.sync(m) + _, err := b.apply(req) + if err != nil { + t.Errorf("should not have received error on apply: %v", err) + } + fakeTime = fakeTime + 10 + b.sync() b.lock.Lock() - if len(b.requests) != 0 { - t.Errorf("pending requests got: %d, want: %d", len(b.requests), 0) + if b.request.Weight != 0 { + t.Errorf("pending request weight got: %d, want: %d", b.request.Weight, 0) } if !reflect.DeepEqual(*b.result, serverResult) { t.Errorf("result got: %#v, want: %#v", *b.result, serverResult) @@ -250,7 +241,7 @@ func TestSync(t *testing.T) { t.Errorf("synced got: %#v, want: %#v", b.synced, m.now()) } - b.deleteAfter = time.Millisecond + fakeTime = fakeTime + 10*60 b.lock.Unlock() time.Sleep(10 * time.Millisecond) // allow background delete m.bucketsLock.RLock() @@ -261,3 +252,199 @@ func TestSync(t *testing.T) { b.refreshAfter = time.Hour } + +func TestDisconnected(t *testing.T) { + now := func() time.Time { return time.Unix(1521221450, 0) } + + errC := &errControl{ + send: 404, + } + serverResult := Result{} + ts := testServer(&serverResult, now, errC) + defer ts.Close() + + env := test.NewEnv(t) + context := authtest.NewContext(ts.URL, env) + context.SetOrganization("org") + context.SetEnvironment("env") + authContext := &auth.Context{ + Context: context, + DeveloperEmail: "email", + Application: "app", + AccessToken: "token", + ClientID: "clientId", + } + + m := &Manager{ + close: make(chan bool), + closed: make(chan bool), + client: http.DefaultClient, + now: now, + syncRate: 2 * time.Millisecond, + syncQueue: make(chan *bucket, 10), + baseURL: context.ApigeeBase(), + numSyncWorkers: 1, + buckets: map[string]*bucket{}, + syncingBuckets: map[*bucket]struct{}{}, + log: env.Logger(), + } + + p := &product.APIProduct{ + QuotaLimitInt: 1, + QuotaIntervalInt: 1, + QuotaTimeUnit: "second", + } + + args := adapter.QuotaArgs{ + QuotaAmount: 1, + BestEffort: true, + } + + _, err := m.Apply(authContext, p, args) + + wantErr := fmt.Sprintf("unable to sync quota app-: bad response (404): error") + if err == nil || err.Error() != wantErr { + t.Errorf("got error: %s, want: %s", err, wantErr) + } + + _, err = m.Apply(authContext, p, args) + if err == nil || err.Error() != wantErr { + t.Errorf("got error: %s, want: %s", err, wantErr) + } + + errC.send = 200 + m.Start(env) + defer m.Close() + time.Sleep(10 * time.Millisecond) // allow sync + + res, err := m.Apply(authContext, p, args) + if err != nil { + t.Fatalf("got error: %s", err) + } + wantResult := Result{ + Allowed: 1, + Used: 1, + Exceeded: 2, + ExpiryTime: now().Unix(), + Timestamp: now().Unix(), + } + if !reflect.DeepEqual(*res, wantResult) { + t.Errorf("result got: %#v, want: %#v", *res, wantResult) + } +} + +func TestWindowExpired(t *testing.T) { + fakeTime := int64(1521221450) + now := func() time.Time { return time.Unix(fakeTime, 0) } + + errC := &errControl{ + send: 200, + } + serverResult := Result{} + ts := testServer(&serverResult, now, errC) + defer ts.Close() + + env := test.NewEnv(t) + context := authtest.NewContext(ts.URL, env) + context.SetOrganization("org") + context.SetEnvironment("env") + authContext := &auth.Context{ + Context: context, + DeveloperEmail: "email", + Application: "app", + AccessToken: "token", + ClientID: "clientId", + } + + m := &Manager{ + close: make(chan bool), + closed: make(chan bool), + client: http.DefaultClient, + now: now, + syncRate: 2 * time.Millisecond, + syncQueue: make(chan *bucket, 10), + baseURL: context.ApigeeBase(), + numSyncWorkers: 1, + buckets: map[string]*bucket{}, + syncingBuckets: map[*bucket]struct{}{}, + log: env.Logger(), + } + + p := &product.APIProduct{ + QuotaLimitInt: 1, + QuotaIntervalInt: 1, + QuotaTimeUnit: "second", + } + + args := adapter.QuotaArgs{ + QuotaAmount: 1, + BestEffort: true, + } + + res, err := m.Apply(authContext, p, args) // sync is forced + if err != nil { + t.Errorf("got error: %v", err) + } + + quotaID := fmt.Sprintf("%s-%s", authContext.Application, p.Name) + bucket := m.buckets[quotaID] + + if bucket.request.Weight != 0 { + t.Errorf("got: %d, want: %d", bucket.request.Weight, 0) + } + if res.Used != 1 { + t.Errorf("got: %d, want: %d", res.Used, 1) + } + + fakeTime++ + if !bucket.windowExpired() { + t.Errorf("should be expired") + } + + res, err = m.Apply(authContext, p, args) + if err != nil { + t.Errorf("got error: %v", err) + } + if bucket.request.Weight != 1 { + t.Errorf("got: %d, want: %d", bucket.request.Weight, 1) + } + + err = bucket.sync() // after window expiration, should reset + if err != nil { + t.Errorf("got error: %v", err) + } + if bucket.result.Used != 1 { + t.Errorf("got: %d, want: %d", bucket.result.Used, 1) + } + if bucket.result.Exceeded != 0 { + t.Errorf("got: %d, want: %d", bucket.result.Exceeded, 0) + } +} + +type errControl struct { + send int +} + +func testServer(serverResult *Result, now func() time.Time, errC *errControl) *httptest.Server { + + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if errC != nil && errC.send != 200 { + w.WriteHeader(errC.send) + w.Write([]byte("error")) + return + } + + req := Request{} + json.NewDecoder(r.Body).Decode(&req) + serverResult.Allowed = req.Allow + serverResult.Used += req.Weight + if serverResult.Used > serverResult.Allowed { + serverResult.Exceeded = serverResult.Used - serverResult.Allowed + serverResult.Used = serverResult.Allowed + } + serverResult.Timestamp = now().Unix() + serverResult.ExpiryTime = now().Unix() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(serverResult) + })) +} diff --git a/adapter/quota/result_cache.go b/adapter/quota/result_cache.go new file mode 100644 index 0000000..c6c24cb --- /dev/null +++ b/adapter/quota/result_cache.go @@ -0,0 +1,46 @@ +package quota + +import ( + "container/list" + "sync" +) + +// ResultCache is a structure to track Results by ID, bounded by size +type ResultCache struct { + size int + lookup map[string]*Result + buffer list.List + lock sync.Mutex +} + +// Add a Result to the cache +func (d *ResultCache) Add(id string, result *Result) { + d.lock.Lock() + defer d.lock.Unlock() + _, ok := d.lookup[id] + if ok { + return + } + if d.lookup == nil { + d.lookup = make(map[string]*Result) + } + d.lookup[id] = result + d.buffer.PushBack(id) + if d.buffer.Len() > d.size { + e := d.buffer.Front() + d.buffer.Remove(e) + delete(d.lookup, e.Value.(string)) + } + return +} + +// Get a Result from the cache, nil if none +func (d *ResultCache) Get(id string) *Result { + d.lock.Lock() + defer d.lock.Unlock() + result, ok := d.lookup[id] + if ok { + return result + } + return nil +} diff --git a/adapter/quota/result_cache_test.go b/adapter/quota/result_cache_test.go new file mode 100644 index 0000000..46a3e76 --- /dev/null +++ b/adapter/quota/result_cache_test.go @@ -0,0 +1,35 @@ +package quota + +import "testing" + +func TestResultCache(t *testing.T) { + results := ResultCache{ + size: 2, + } + + tests := []struct { + add string + exists []string + notExists []string + }{ + {"test1", []string{"test1"}, []string{""}}, + {"test2", []string{"test1", "test2"}, []string{""}}, + {"test3", []string{"test2", "test3"}, []string{"test1"}}, + {"test1", []string{"test1", "test3"}, []string{"test2"}}, + {"test2", []string{"test1", "test2"}, []string{"test3"}}, + } + + for i, test := range tests { + results.Add(test.add, &Result{}) + for _, id := range test.exists { + if results.Get(id) == nil { + t.Errorf("test[%d] %s value %s should exist", i, test.add, id) + } + } + for _, id := range test.notExists { + if results.Get(id) != nil { + t.Errorf("test[%d] %s value %s should not exist", i, test.add, id) + } + } + } +} diff --git a/adapter/quota/structs.go b/adapter/quota/structs.go index 1a34010..16e4aad 100644 --- a/adapter/quota/structs.go +++ b/adapter/quota/structs.go @@ -14,14 +14,15 @@ package quota +import "time" + // A Request is sent to Apigee's quota server to allocate quota. type Request struct { - Identifier string `json:"identifier"` - Weight int64 `json:"weight"` - Interval int64 `json:"interval"` - Allow int64 `json:"allow"` - TimeUnit string `json:"timeUnit"` - DeduplicationID string `json:"-"` // for Istio, not Apigee + Identifier string `json:"identifier"` + Weight int64 `json:"weight"` + Interval int64 `json:"interval"` + Allow int64 `json:"allow"` + TimeUnit string `json:"timeUnit"` } // A Result is a response from Apigee's quota server that gives information @@ -34,3 +35,7 @@ type Result struct { ExpiryTime int64 `json:"expiryTime"` Timestamp int64 `json:"timestamp"` } + +func (r *Result) expiredAt(tm time.Time) bool { + return time.Unix(r.ExpiryTime, 0).After(tm) +}