Skip to content

Commit

Permalink
Delay key proc sampler (#160)
Browse files Browse the repository at this point in the history
Add unit test for newV3KeyProcessor
  • Loading branch information
kramvan1 authored Dec 14, 2023
1 parent 0f271ea commit cf37323
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
2 changes: 1 addition & 1 deletion rules/key_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ func (v3kp *v3KeyProcessor) processKey(key string, value *string, api readAPI, l

func (v3kp *v3KeyProcessor) bufferCapacitySampler(logger *zap.Logger) {
for {
time.Sleep(time.Minute)
remainingBuffer := cap(v3kp.kpChannel) - len(v3kp.kpChannel)
metrics.KeyProcessBufferCap(remainingBuffer)
currentHour := time.Now().UTC().Hour()
if (float32(remainingBuffer)/float32(cap(v3kp.kpChannel))) < 0.05 && v3kp.lastNotified != currentHour {
logger.Warn("Rules engine buffer is near capacity", zap.Int("capacity", cap(v3kp.kpChannel)), zap.Int("remaining", remainingBuffer))
v3kp.lastNotified = currentHour
}
time.Sleep(time.Minute)
}
}

Expand Down
24 changes: 24 additions & 0 deletions rules/key_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rules

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,3 +78,26 @@ func TestV3KeyProcessor(t *testing.T) {
work := <-channel
assert.Equal(t, "/test/lock/key", work.lockKey)
}

func TestNewV3KeyProcessor(t *testing.T) {
value := "value"
rule, err := NewEqualsLiteralRule("/test/:key", &value)
assert.NoError(t, err)
rm := newRuleManager(map[string]constraint{}, false)
rm.addRule(rule)
api := newMapReadAPI()
api.put("/test/key", value)

channel := make(chan v3RuleWork)
kpChannel := make(chan *keyTask, 1000)
logger := getTestLogger()
kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, logger)
kp.setCallback(0, V3RuleTaskCallback(v3DummyCallback))
kp.setContextProvider(0, defaultContextProvider)
kp.setRuleID(0, "testKey")
kp.setLockKeyPattern(0, "/test/lock/:key")
go kp.processKey("/test/key", &value, api, logger, map[string]string{}, nil)
time.Sleep(time.Second)
work := <-channel
assert.Equal(t, "/test/lock/key", work.lockKey)
}

0 comments on commit cf37323

Please sign in to comment.