Skip to content

Commit

Permalink
Use concurrency in poolmanager as per old behaviour (#2876)
Browse files Browse the repository at this point in the history
* Use concurrency in poolmanager as per old behaviour
* Update code comments

---------

Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
  • Loading branch information
sanketsudake committed Nov 21, 2023
1 parent d23ed57 commit b85ba9e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
21 changes: 16 additions & 5 deletions pkg/executor/fscache/poolcache.go
Expand Up @@ -96,7 +96,6 @@ type (
)

// NewPoolCache create a Cache object

func NewPoolCache(logger *zap.Logger) *PoolCache {
c := &PoolCache{
cache: make(map[crd.CacheKeyURG]*funcSvcGroup),
Expand All @@ -122,6 +121,7 @@ func (c *PoolCache) service() {
case getValue:
funcSvcGroup, ok := c.cache[req.function]
if !ok {
// first request for this function, create a new group
c.cache[req.function] = NewFuncSvcGroup()
c.cache[req.function].svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound,
Expand All @@ -131,6 +131,7 @@ func (c *PoolCache) service() {
}
found := false
totalActiveRequests := 0
// check if any specialized pod is available
for addr := range funcSvcGroup.svcs {
totalActiveRequests += funcSvcGroup.svcs[addr].activeRequests
if funcSvcGroup.svcs[addr].activeRequests < req.requestsPerPod &&
Expand All @@ -145,12 +146,22 @@ func (c *PoolCache) service() {
break
}
}
// if specialized pod is available then return svc
if found {
req.responseChannel <- resp
continue
}
specializationInProgress := funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len()
capacity := ((specializationInProgress + len(funcSvcGroup.svcs)) * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting)
concurrencyUsed := len(funcSvcGroup.svcs) + (funcSvcGroup.svcWaiting - funcSvcGroup.queue.Len())
// if concurrency is available then be aggressive and use it as we are not sure if specialization will complete for other requests
if req.concurrency > 0 && concurrencyUsed < req.concurrency {
funcSvcGroup.svcWaiting++
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%s' not found", req.function))
req.responseChannel <- resp
continue
}
// if no concurrency is available then check if there is any virtual capacity in the existing pods to serve the request in future
// if specialization doesnt complete within request then request will be timeout
capacity := (concurrencyUsed * req.requestsPerPod) - (totalActiveRequests + funcSvcGroup.svcWaiting)
if capacity > 0 {
funcSvcGroup.svcWaiting++
svcWait := &svcWait{
Expand All @@ -164,8 +175,8 @@ func (c *PoolCache) service() {
}

// concurrency should not be set to zero and
//sum of specialization in progress and specialized pods should be less then req.concurrency
if req.concurrency > 0 && (specializationInProgress+len(funcSvcGroup.svcs)) >= req.concurrency {
// sum of specialization in progress and specialized pods should be less then req.concurrency
if req.concurrency > 0 && concurrencyUsed >= req.concurrency {
resp.error = ferror.MakeError(ferror.ErrorTooManyRequests, fmt.Sprintf("function '%s' concurrency '%d' limit reached.", req.function, req.concurrency))
} else {
funcSvcGroup.svcWaiting++
Expand Down
27 changes: 17 additions & 10 deletions pkg/executor/fscache/poolcache_test.go
Expand Up @@ -224,17 +224,19 @@ func TestPoolCacheRequests(t *testing.T) {
simultaneous = 1
}
for i := 1; i <= tt.requests; i++ {
reqno := i
wg.Add(1)
go func(reqno int) {
defer wg.Done()
svc, err := p.GetSvcValue(context.Background(), key, tt.rpp, tt.concurrency)
if err != nil {
code, _ := ferror.GetHTTPError(err)
if code == http.StatusNotFound {
p.SetSvcValue(context.Background(), key, fmt.Sprintf("svc-%d", svcCounter), &FuncSvc{
Name: "value",
}, resource.MustParse("45m"), tt.rpp, tt.retainPods)
atomic.AddUint64(&svcCounter, 1)
address := fmt.Sprintf("svc-%d", atomic.LoadUint64(&svcCounter))
p.SetSvcValue(context.Background(), key, address, &FuncSvc{
Name: address,
}, resource.MustParse("45m"), tt.rpp, tt.retainPods)
} else {
t.Log(reqno, "=>", err)
atomic.AddUint64(&failedRequests, 1)
Expand All @@ -244,9 +246,12 @@ func TestPoolCacheRequests(t *testing.T) {
t.Log(reqno, "=>", "svc is nil")
atomic.AddUint64(&failedRequests, 1)
}
// } else {
// t.Log(reqno, "=>", svc.Name)
// }
}
}(i)
if i%simultaneous == 0 {
}(reqno)
if reqno%simultaneous == 0 {
wg.Wait()
}
}
Expand All @@ -258,10 +263,11 @@ func TestPoolCacheRequests(t *testing.T) {
for i := 0; i < tt.concurrency; i++ {
for j := 0; j < tt.rpp; j++ {
wg.Add(1)
go func(i int) {
svcno := i
go func(svcno int) {
defer wg.Done()
p.MarkAvailable(key, fmt.Sprintf("svc-%d", i))
}(i)
p.MarkAvailable(key, fmt.Sprintf("svc-%d", svcno+1))
}(svcno)
}
}
wg.Wait()
Expand All @@ -270,8 +276,9 @@ func TestPoolCacheRequests(t *testing.T) {
UID: "func",
Generation: 2,
}
p.SetSvcValue(context.Background(), newKey, fmt.Sprintf("svc-%d", svcCounter), &FuncSvc{
Name: "value",
address := fmt.Sprintf("svc-%d", svcCounter)
p.SetSvcValue(context.Background(), newKey, address, &FuncSvc{
Name: address,
}, resource.MustParse("45m"), tt.rpp, tt.retainPods)
funcSvc := p.ListAvailableValue()
require.Equal(t, tt.concurrency, len(funcSvc))
Expand Down

0 comments on commit b85ba9e

Please sign in to comment.