Skip to content

Commit

Permalink
Add funcSvcGroup type in pool cache for grouping of function services (
Browse files Browse the repository at this point in the history
…#2728)

* add specfic structs for function svc and group

* remove unused var in funcSvcGroup

* format poolcache
  • Loading branch information
pranoyk committed Feb 28, 2023
1 parent 1cb18a7 commit 0de8923
Showing 1 changed file with 37 additions and 28 deletions.
65 changes: 37 additions & 28 deletions pkg/executor/fscache/poolcache.go
Expand Up @@ -39,17 +39,21 @@ const (
)

type (
// value used as "value" in cache
value struct {
funcSvcInfo struct {
val *FuncSvc
activeRequests int // number of requests served by function pod
currentCPUUsage resource.Quantity // current cpu usage of the specialized function pod
cpuLimit resource.Quantity // if currentCPUUsage is more than cpuLimit cache miss occurs in getValue request
}

funcSvcGroup struct {
svcs map[string]*funcSvcInfo
}

// PoolCache implements a simple cache implementation having values mapped by two keys [function][address].
// As of now PoolCache is only used by poolmanager executor
PoolCache struct {
cache map[string]map[string]*value
cache map[string]*funcSvcGroup
requestChannel chan *request
logger *zap.Logger
}
Expand All @@ -76,7 +80,7 @@ type (

func NewPoolCache(logger *zap.Logger) *PoolCache {
c := &PoolCache{
cache: make(map[string]map[string]*value),
cache: make(map[string]*funcSvcGroup),
requestChannel: make(chan *request),
logger: logger,
}
Expand All @@ -90,47 +94,50 @@ func (c *PoolCache) service() {
resp := &response{}
switch req.requestType {
case getValue:
values, ok := c.cache[req.function]
funcSvcGroup, ok := c.cache[req.function]
found := false
if !ok {
resp.error = ferror.MakeError(ferror.ErrorNotFound,
fmt.Sprintf("function Name '%v' not found", req.function))
} else {
for addr := range values {
if values[addr].activeRequests < req.requestsPerPod && values[addr].currentCPUUsage.Cmp(values[addr].cpuLimit) < 1 {
for addr := range funcSvcGroup.svcs {
if funcSvcGroup.svcs[addr].activeRequests < req.requestsPerPod &&
funcSvcGroup.svcs[addr].currentCPUUsage.Cmp(funcSvcGroup.svcs[addr].cpuLimit) < 1 {
// mark active
values[addr].activeRequests++
funcSvcGroup.svcs[addr].activeRequests++
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with getValue", zap.String("function", req.function), zap.String("address", addr), zap.Int("activeRequests", values[addr].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with getValue", zap.String("function", req.function), zap.String("address", addr), zap.Int("activeRequests", funcSvcGroup.svcs[addr].activeRequests))
}
resp.value = values[addr].val
resp.value = funcSvcGroup.svcs[addr].val
found = true
break
}
}
if !found {
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%v' all functions are busy", req.function))
}
resp.totalActive = len(values)
resp.totalActive = len(funcSvcGroup.svcs)
}
req.responseChannel <- resp
case setValue:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = make(map[string]*value)
c.cache[req.function] = &funcSvcGroup{
svcs: make(map[string]*funcSvcInfo),
}
}
if _, ok := c.cache[req.function][req.address]; !ok {
c.cache[req.function][req.address] = &value{}
if _, ok := c.cache[req.function].svcs[req.address]; !ok {
c.cache[req.function].svcs[req.address] = &funcSvcInfo{}
}
c.cache[req.function][req.address].val = req.value
c.cache[req.function][req.address].activeRequests++
c.cache[req.function].svcs[req.address].val = req.value
c.cache[req.function].svcs[req.address].activeRequests++
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
}
c.cache[req.function][req.address].cpuLimit = req.cpuUsage
c.cache[req.function].svcs[req.address].cpuLimit = req.cpuUsage
case listAvailableValue:
vals := make([]*FuncSvc, 0)
for key1, values := range c.cache {
for key2, value := range values {
for key2, value := range values.svcs {
debugLevel := c.logger.Core().Enabled(zap.DebugLevel)
if debugLevel {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Reading active requests", zap.String("function", key1), zap.String("address", key2), zap.Int("activeRequests", value.activeRequests))
Expand All @@ -147,26 +154,28 @@ func (c *PoolCache) service() {
req.responseChannel <- resp
case setCPUUtilization:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = make(map[string]*value)
c.cache[req.function] = &funcSvcGroup{
svcs: make(map[string]*funcSvcInfo),
}
}
if _, ok := c.cache[req.function][req.address]; ok {
c.cache[req.function][req.address].currentCPUUsage = req.cpuUsage
if _, ok := c.cache[req.function].svcs[req.address]; ok {
c.cache[req.function].svcs[req.address].currentCPUUsage = req.cpuUsage
}
case markAvailable:
if _, ok := c.cache[req.function]; ok {
if _, ok = c.cache[req.function][req.address]; ok {
if c.cache[req.function][req.address].activeRequests > 0 {
c.cache[req.function][req.address].activeRequests--
if _, ok = c.cache[req.function].svcs[req.address]; ok {
if c.cache[req.function].svcs[req.address].activeRequests > 0 {
c.cache[req.function].svcs[req.address].activeRequests--
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
}
} else {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Error("Invalid request to decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Error("Invalid request to decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
}
}
}
case deleteValue:
delete(c.cache[req.function], req.address)
delete(c.cache[req.function].svcs, req.address)
req.responseChannel <- resp
default:
resp.error = ferror.MakeError(ferror.ErrorInvalidArgument,
Expand Down

0 comments on commit 0de8923

Please sign in to comment.