Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add funcSvcGroup type in pool cache for grouping of function services #2728

Merged
merged 3 commits into from Feb 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 37 additions & 28 deletions pkg/executor/fscache/poolcache.go
Expand Up @@ -39,17 +39,21 @@ const (
)

type (
// value used as "value" in cache
value struct {
funcSvcInfo struct {
val *FuncSvc
activeRequests int // number of requests served by function pod
currentCPUUsage resource.Quantity // current cpu usage of the specialized function pod
cpuLimit resource.Quantity // if currentCPUUsage is more than cpuLimit cache miss occurs in getValue request
}

funcSvcGroup struct {
svcs map[string]*funcSvcInfo
}

// PoolCache implements a simple cache implementation having values mapped by two keys [function][address].
// As of now PoolCache is only used by poolmanager executor
PoolCache struct {
cache map[string]map[string]*value
cache map[string]*funcSvcGroup
requestChannel chan *request
logger *zap.Logger
}
Expand All @@ -76,7 +80,7 @@ type (

func NewPoolCache(logger *zap.Logger) *PoolCache {
c := &PoolCache{
cache: make(map[string]map[string]*value),
cache: make(map[string]*funcSvcGroup),
requestChannel: make(chan *request),
logger: logger,
}
Expand All @@ -90,47 +94,50 @@ func (c *PoolCache) service() {
resp := &response{}
switch req.requestType {
case getValue:
values, ok := c.cache[req.function]
funcSvcGroup, ok := c.cache[req.function]
found := false
if !ok {
resp.error = ferror.MakeError(ferror.ErrorNotFound,
fmt.Sprintf("function Name '%v' not found", req.function))
} else {
for addr := range values {
if values[addr].activeRequests < req.requestsPerPod && values[addr].currentCPUUsage.Cmp(values[addr].cpuLimit) < 1 {
for addr := range funcSvcGroup.svcs {
if funcSvcGroup.svcs[addr].activeRequests < req.requestsPerPod &&
funcSvcGroup.svcs[addr].currentCPUUsage.Cmp(funcSvcGroup.svcs[addr].cpuLimit) < 1 {
// mark active
values[addr].activeRequests++
funcSvcGroup.svcs[addr].activeRequests++
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with getValue", zap.String("function", req.function), zap.String("address", addr), zap.Int("activeRequests", values[addr].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with getValue", zap.String("function", req.function), zap.String("address", addr), zap.Int("activeRequests", funcSvcGroup.svcs[addr].activeRequests))
}
resp.value = values[addr].val
resp.value = funcSvcGroup.svcs[addr].val
found = true
break
}
}
if !found {
resp.error = ferror.MakeError(ferror.ErrorNotFound, fmt.Sprintf("function '%v' all functions are busy", req.function))
}
resp.totalActive = len(values)
resp.totalActive = len(funcSvcGroup.svcs)
}
req.responseChannel <- resp
case setValue:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = make(map[string]*value)
c.cache[req.function] = &funcSvcGroup{
svcs: make(map[string]*funcSvcInfo),
}
}
if _, ok := c.cache[req.function][req.address]; !ok {
c.cache[req.function][req.address] = &value{}
if _, ok := c.cache[req.function].svcs[req.address]; !ok {
c.cache[req.function].svcs[req.address] = &funcSvcInfo{}
}
c.cache[req.function][req.address].val = req.value
c.cache[req.function][req.address].activeRequests++
c.cache[req.function].svcs[req.address].val = req.value
c.cache[req.function].svcs[req.address].activeRequests++
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
}
c.cache[req.function][req.address].cpuLimit = req.cpuUsage
c.cache[req.function].svcs[req.address].cpuLimit = req.cpuUsage
case listAvailableValue:
vals := make([]*FuncSvc, 0)
for key1, values := range c.cache {
for key2, value := range values {
for key2, value := range values.svcs {
debugLevel := c.logger.Core().Enabled(zap.DebugLevel)
if debugLevel {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Reading active requests", zap.String("function", key1), zap.String("address", key2), zap.Int("activeRequests", value.activeRequests))
Expand All @@ -147,26 +154,28 @@ func (c *PoolCache) service() {
req.responseChannel <- resp
case setCPUUtilization:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = make(map[string]*value)
c.cache[req.function] = &funcSvcGroup{
svcs: make(map[string]*funcSvcInfo),
}
}
if _, ok := c.cache[req.function][req.address]; ok {
c.cache[req.function][req.address].currentCPUUsage = req.cpuUsage
if _, ok := c.cache[req.function].svcs[req.address]; ok {
c.cache[req.function].svcs[req.address].currentCPUUsage = req.cpuUsage
}
case markAvailable:
if _, ok := c.cache[req.function]; ok {
if _, ok = c.cache[req.function][req.address]; ok {
if c.cache[req.function][req.address].activeRequests > 0 {
c.cache[req.function][req.address].activeRequests--
if _, ok = c.cache[req.function].svcs[req.address]; ok {
if c.cache[req.function].svcs[req.address].activeRequests > 0 {
c.cache[req.function].svcs[req.address].activeRequests--
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
}
} else {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Error("Invalid request to decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Error("Invalid request to decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function].svcs[req.address].activeRequests))
}
}
}
case deleteValue:
delete(c.cache[req.function], req.address)
delete(c.cache[req.function].svcs, req.address)
req.responseChannel <- resp
default:
resp.error = ferror.MakeError(ferror.ErrorInvalidArgument,
Expand Down