Skip to content

Commit

Permalink
internal: Making poolcache typed and merged into fscache (#2693)
Browse files Browse the repository at this point in the history
Merged pool cache package into fscache to avoid import cycle.
Also changed all types in pool cache from interface to specific
types.

Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
  • Loading branch information
sanketsudake committed Jan 15, 2023
1 parent deb3523 commit 0edf264
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 54 deletions.
24 changes: 11 additions & 13 deletions pkg/executor/fscache/functionServiceCache.go
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/fission/fission/pkg/crd"
ferror "github.com/fission/fission/pkg/error"
"github.com/fission/fission/pkg/executor/metrics"
"github.com/fission/fission/pkg/poolcache"
)

type fscRequestType int
Expand Down Expand Up @@ -68,12 +67,12 @@ type (
// FunctionServiceCache represents the function service cache
FunctionServiceCache struct {
logger *zap.Logger
byFunction *cache.Cache // function-key -> funcSvc : map[string]*funcSvc
byAddress *cache.Cache // address -> function : map[string]metav1.ObjectMeta
byFunctionUID *cache.Cache // function uid -> function : map[string]metav1.ObjectMeta
connFunctionCache *poolcache.Cache // function-key -> funcSvc : map[string]*funcSvc
PodToFsvc sync.Map // pod-name -> funcSvc: map[string]*FuncSvc
WebsocketFsvc sync.Map // funcSvc-name -> bool: map[string]bool
byFunction *cache.Cache // function-key -> funcSvc : map[string]*funcSvc
byAddress *cache.Cache // address -> function : map[string]metav1.ObjectMeta
byFunctionUID *cache.Cache // function uid -> function : map[string]metav1.ObjectMeta
connFunctionCache *PoolCache // function-key -> funcSvc : map[string]*funcSvc
PodToFsvc sync.Map // pod-name -> funcSvc: map[string]*FuncSvc
WebsocketFsvc sync.Map // funcSvc-name -> bool: map[string]bool
requestChannel chan *fscRequest
}

Expand Down Expand Up @@ -113,7 +112,7 @@ func MakeFunctionServiceCache(logger *zap.Logger) *FunctionServiceCache {
byFunction: cache.MakeCache(0, 0),
byAddress: cache.MakeCache(0, 0),
byFunctionUID: cache.MakeCache(0, 0),
connFunctionCache: poolcache.NewPoolCache(logger.Named("conn_function_cache")),
connFunctionCache: NewPoolCache(logger.Named("conn_function_cache")),
requestChannel: make(chan *fscRequest),
}
go fsc.service()
Expand Down Expand Up @@ -159,8 +158,8 @@ func (fsc *FunctionServiceCache) service() {
case LISTOLDPOOL:
fscs := fsc.connFunctionCache.ListAvailableValue()
funcObjects := make([]*FuncSvc, 0)
for _, funcSvc := range fscs {
if fsvc, ok := funcSvc.(*FuncSvc); ok && time.Since(fsvc.Atime) > req.age {
for _, fsvc := range fscs {
if time.Since(fsvc.Atime) > req.age {
funcObjects = append(funcObjects, fsvc)
}
}
Expand Down Expand Up @@ -192,14 +191,13 @@ func (fsc *FunctionServiceCache) GetByFunction(m *metav1.ObjectMeta) (*FuncSvc,
func (fsc *FunctionServiceCache) GetFuncSvc(ctx context.Context, m *metav1.ObjectMeta, requestsPerPod int) (*FuncSvc, int, error) {
key := crd.CacheKey(m)

fsvcI, active, err := fsc.connFunctionCache.GetValue(ctx, key, requestsPerPod)
fsvc, active, err := fsc.connFunctionCache.GetSvcValue(ctx, key, requestsPerPod)
if err != nil {
fsc.logger.Info("Not found in Cache")
return nil, active, err
}

// update atime
fsvc := fsvcI.(*FuncSvc)
fsvc.Atime = time.Now()

fsvcCopy := *fsvc
Expand Down Expand Up @@ -230,7 +228,7 @@ func (fsc *FunctionServiceCache) GetByFunctionUID(uid types.UID) (*FuncSvc, erro

// AddFunc adds a function service to pool cache.
func (fsc *FunctionServiceCache) AddFunc(ctx context.Context, fsvc FuncSvc) {
fsc.connFunctionCache.SetValue(ctx, crd.CacheKey(fsvc.Function), fsvc.Address, &fsvc, fsvc.CPULimit)
fsc.connFunctionCache.SetSvcValue(ctx, crd.CacheKey(fsvc.Function), fsvc.Address, &fsvc, fsvc.CPULimit)
now := time.Now()
fsvc.Ctime = now
fsvc.Atime = now
Expand Down
64 changes: 32 additions & 32 deletions pkg/poolcache/poolcache.go → pkg/executor/fscache/poolcache.go
Expand Up @@ -14,9 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package poolcache implements a simple cache implementation having values mapped by two keys.
// As of now this package is only used by poolmanager executor
package poolcache
package fscache

import (
"context"
Expand All @@ -43,48 +41,50 @@ const (
type (
// value used as "value" in cache
value struct {
val interface{}
val *FuncSvc
activeRequests int // number of requests served by function pod
currentCPUUsage resource.Quantity // current cpu usage of the specialized function pod
cpuLimit resource.Quantity // if currentCPUUsage is more than cpuLimit cache miss occurs in getValue request
}
// Cache is simple cache having two keys [function][address] mapped to value and requestChannel for operation on it
Cache struct {
cache map[interface{}]map[interface{}]*value
// PoolCache implements a simple cache implementation having values mapped by two keys [function][address].
// As of now PoolCache is only used by poolmanager executor
PoolCache struct {
cache map[string]map[string]*value
requestChannel chan *request
logger *zap.Logger
}

request struct {
requestType
ctx context.Context
function interface{}
address interface{}
value interface{}
function string
address string
value *FuncSvc
requestsPerPod int
cpuUsage resource.Quantity
responseChannel chan *response
}
response struct {
error
allValues []interface{}
value interface{}
allValues []*FuncSvc
value *FuncSvc
totalActive int
}
)

// NewPoolCache create a Cache object
func NewPoolCache(logger *zap.Logger) *Cache {
c := &Cache{
cache: make(map[interface{}]map[interface{}]*value),

func NewPoolCache(logger *zap.Logger) *PoolCache {
c := &PoolCache{
cache: make(map[string]map[string]*value),
requestChannel: make(chan *request),
logger: logger,
}
go c.service()
return c
}

func (c *Cache) service() {
func (c *PoolCache) service() {
for {
req := <-c.requestChannel
resp := &response{}
Expand All @@ -101,7 +101,7 @@ func (c *Cache) service() {
// mark active
values[addr].activeRequests++
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with getValue", zap.String("function", req.function.(string)), zap.String("address", addr.(string)), zap.Int("activeRequests", values[addr].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with getValue", zap.String("function", req.function), zap.String("address", addr), zap.Int("activeRequests", values[addr].activeRequests))
}
resp.value = values[addr].val
found = true
Expand All @@ -116,28 +116,28 @@ func (c *Cache) service() {
req.responseChannel <- resp
case setValue:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = make(map[interface{}]*value)
c.cache[req.function] = make(map[string]*value)
}
if _, ok := c.cache[req.function][req.address]; !ok {
c.cache[req.function][req.address] = &value{}
}
c.cache[req.function][req.address].val = req.value
c.cache[req.function][req.address].activeRequests++
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function.(string)), zap.String("address", req.address.(string)), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Increase active requests with setValue", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
}
c.cache[req.function][req.address].cpuLimit = req.cpuUsage
case listAvailableValue:
vals := make([]interface{}, 0)
vals := make([]*FuncSvc, 0)
for key1, values := range c.cache {
for key2, value := range values {
debugLevel := c.logger.Core().Enabled(zap.DebugLevel)
if debugLevel {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Reading active requests", zap.String("function", key1.(string)), zap.String("address", key2.(string)), zap.Int("activeRequests", value.activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Reading active requests", zap.String("function", key1), zap.String("address", key2), zap.Int("activeRequests", value.activeRequests))
}
if value.activeRequests == 0 {
if debugLevel {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Function service with no active requests", zap.String("function", key1.(string)), zap.String("address", key2.(string)), zap.Int("activeRequests", value.activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Function service with no active requests", zap.String("function", key1), zap.String("address", key2), zap.Int("activeRequests", value.activeRequests))
}
vals = append(vals, value.val)
}
Expand All @@ -147,7 +147,7 @@ func (c *Cache) service() {
req.responseChannel <- resp
case setCPUUtilization:
if _, ok := c.cache[req.function]; !ok {
c.cache[req.function] = make(map[interface{}]*value)
c.cache[req.function] = make(map[string]*value)
}
if _, ok := c.cache[req.function][req.address]; ok {
c.cache[req.function][req.address].currentCPUUsage = req.cpuUsage
Expand All @@ -158,10 +158,10 @@ func (c *Cache) service() {
if c.cache[req.function][req.address].activeRequests > 0 {
c.cache[req.function][req.address].activeRequests--
if c.logger.Core().Enabled(zap.DebugLevel) {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Decrease active requests", zap.String("function", req.function.(string)), zap.String("address", req.address.(string)), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Debug("Decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
}
} else {
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Error("Invalid request to decrease active requests", zap.String("function", req.function.(string)), zap.String("address", req.address.(string)), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
otelUtils.LoggerWithTraceID(req.ctx, c.logger).Error("Invalid request to decrease active requests", zap.String("function", req.function), zap.String("address", req.address), zap.Int("activeRequests", c.cache[req.function][req.address].activeRequests))
}
}
}
Expand All @@ -176,8 +176,8 @@ func (c *Cache) service() {
}
}

// GetValue returns a value interface with status inActive else return error
func (c *Cache) GetValue(ctx context.Context, function interface{}, requestsPerPod int) (interface{}, int, error) {
// GetValue returns a function service with status in Active else return error
func (c *PoolCache) GetSvcValue(ctx context.Context, function string, requestsPerPod int) (*FuncSvc, int, error) {
respChannel := make(chan *response)
c.requestChannel <- &request{
ctx: ctx,
Expand All @@ -191,7 +191,7 @@ func (c *Cache) GetValue(ctx context.Context, function interface{}, requestsPerP
}

// ListAvailableValue returns a list of the available function services stored in the Cache
func (c *Cache) ListAvailableValue() []interface{} {
func (c *PoolCache) ListAvailableValue() []*FuncSvc {
respChannel := make(chan *response)
c.requestChannel <- &request{
requestType: listAvailableValue,
Expand All @@ -202,7 +202,7 @@ func (c *Cache) ListAvailableValue() []interface{} {
}

// SetValue marks the value at key [function][address] as active(begin used)
func (c *Cache) SetValue(ctx context.Context, function, address, value interface{}, cpuLimit resource.Quantity) {
func (c *PoolCache) SetSvcValue(ctx context.Context, function, address string, value *FuncSvc, cpuLimit resource.Quantity) {
respChannel := make(chan *response)
c.requestChannel <- &request{
ctx: ctx,
Expand All @@ -216,7 +216,7 @@ func (c *Cache) SetValue(ctx context.Context, function, address, value interface
}

// SetCPUUtilization updates/sets the CPU utilization limit for the pod
func (c *Cache) SetCPUUtilization(function, address interface{}, cpuUsage resource.Quantity) {
func (c *PoolCache) SetCPUUtilization(function, address string, cpuUsage resource.Quantity) {
c.requestChannel <- &request{
requestType: setCPUUtilization,
function: function,
Expand All @@ -227,7 +227,7 @@ func (c *Cache) SetCPUUtilization(function, address interface{}, cpuUsage resour
}

// MarkAvailable marks the value at key [function][address] as available
func (c *Cache) MarkAvailable(function, address interface{}) {
func (c *PoolCache) MarkAvailable(function, address string) {
respChannel := make(chan *response)
c.requestChannel <- &request{
requestType: markAvailable,
Expand All @@ -238,7 +238,7 @@ func (c *Cache) MarkAvailable(function, address interface{}) {
}

// DeleteValue deletes the value at key composed of [function][address]
func (c *Cache) DeleteValue(ctx context.Context, function, address interface{}) error {
func (c *PoolCache) DeleteValue(ctx context.Context, function, address string) error {
respChannel := make(chan *response)
c.requestChannel <- &request{
ctx: ctx,
Expand Down
@@ -1,4 +1,4 @@
package poolcache
package fscache

import (
"context"
Expand All @@ -22,11 +22,17 @@ func TestPoolCache(t *testing.T) {
logger := loggerfactory.GetLogger()
c := NewPoolCache(logger)

c.SetValue(ctx, "func", "ip", "value", resource.MustParse("45m"))
c.SetSvcValue(ctx, "func", "ip", &FuncSvc{
Name: "value",
}, resource.MustParse("45m"))

c.SetValue(ctx, "func2", "ip2", "value2", resource.MustParse("50m"))
c.SetSvcValue(ctx, "func2", "ip2", &FuncSvc{
Name: "value2",
}, resource.MustParse("50m"))

c.SetValue(ctx, "func2", "ip22", "value22", resource.MustParse("33m"))
c.SetSvcValue(ctx, "func2", "ip22", &FuncSvc{
Name: "value22",
}, resource.MustParse("33m"))

checkErr(c.DeleteValue(ctx, "func2", "ip2"))

Expand All @@ -37,28 +43,30 @@ func TestPoolCache(t *testing.T) {

c.MarkAvailable("func", "ip")

_, active, err := c.GetValue(ctx, "func", 5)
_, active, err := c.GetSvcValue(ctx, "func", 5)
if active != 1 {
log.Panicln("Expected 1 active, found", active)
}
checkErr(err)

checkErr(c.DeleteValue(ctx, "func", "ip"))

_, _, err = c.GetValue(ctx, "func", 5)
_, _, err = c.GetSvcValue(ctx, "func", 5)
if err == nil {
log.Panicf("found deleted element")
}

c.SetValue(ctx, "cpulimit", "100", "value", resource.MustParse("3m"))
c.SetSvcValue(ctx, "cpulimit", "100", &FuncSvc{
Name: "value",
}, resource.MustParse("3m"))
c.SetCPUUtilization("cpulimit", "100", resource.MustParse("4m"))

_, _, err = c.GetValue(ctx, "cpulimit", 5)
_, _, err = c.GetSvcValue(ctx, "cpulimit", 5)

if err == nil {
log.Panicf("received pod address with higher CPU usage than limit")
}
c.SetCPUUtilization("cpulimit", "100", resource.MustParse("2m"))
_, _, err = c.GetValue(ctx, "cpulimit", 5)
_, _, err = c.GetSvcValue(ctx, "cpulimit", 5)
checkErr(err)
}

0 comments on commit 0edf264

Please sign in to comment.