Skip to content

Commit

Permalink
Ensure handling for specialization failure in pool manager (#2788)
Browse files Browse the repository at this point in the history
* Add fixes for failure in specialization
* reduce specialization in progress and remove expired requests from queue when specialization is timed out
* rename markSpecializationFailure and remove logger from the queue
* refactor clean up code in api.go and add test case for queue

Details:

- Cleanup svc waiting for the counter in the pool manager if specialization fails
- Cleanup active requests counter in pool manager if client exists the demand for function service while we have allocated function service
- Consider specialization timeout if pod ready timeout > specialization timeout in waiting for ready pod. We also consider if the request to choosePod is cancelled.
- We ensure if we have requests waiting for service requests but if there is no pod in the specialization we clean up those.
---------

Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
Co-authored-by: Pranoy Kundu <pranoy1998k@gmail.com>
  • Loading branch information
sanketsudake and pranoyk committed May 11, 2023
1 parent 31c81e1 commit 6c431e4
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 2 deletions.
19 changes: 19 additions & 0 deletions pkg/executor/api.go
Expand Up @@ -31,8 +31,10 @@ import (
"go.uber.org/zap"

fv1 "github.com/fission/fission/pkg/apis/core/v1"
"github.com/fission/fission/pkg/crd"
ferror "github.com/fission/fission/pkg/error"
"github.com/fission/fission/pkg/executor/client"
"github.com/fission/fission/pkg/executor/fscache"
"github.com/fission/fission/pkg/utils/httpserver"
"github.com/fission/fission/pkg/utils/metrics"
otelUtils "github.com/fission/fission/pkg/utils/otel"
Expand Down Expand Up @@ -148,7 +150,24 @@ func (executor *Executor) getServiceForFunction(ctx context.Context, fn *fv1.Fun
respChan: respChan,
}
resp := <-respChan
cleanUp := func(funcSvc *fscache.FuncSvc) {
et, ok := executor.executorTypes[fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType]
if !ok {
executor.logger.Error("unknown executor type received in function service", zap.Any("executor", funcSvc.Executor))
return
}
if funcSvc != nil {
et.UnTapService(ctx, crd.CacheKey(funcSvc.Function), resp.funcSvc.Address)
} else {
et.MarkSpecializationFailure(ctx, crd.CacheKey(&fn.ObjectMeta))
}
}
if errors.Is(ctx.Err(), context.Canceled) {
cleanUp(resp.funcSvc)
return "", ferror.MakeError(499, "client leave early in the process of getServiceForFunction")
}
if resp.err != nil {
cleanUp(resp.funcSvc)
return "", resp.err
}
return resp.funcSvc.Address, resp.err
Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/executortype/container/containermgr.go
Expand Up @@ -177,6 +177,11 @@ func (caaf *Container) UnTapService(ctx context.Context, key string, svcHost str
// Not Implemented for CaaF.
}

// MarkSpecializationFailure has not been implemented for CaaF.
func (caaf *Container) MarkSpecializationFailure(ctx context.Context, key string) {
// Not Implemented for CaaF.
}

// GetFuncSvc returns a function service; error otherwise.
func (caaf *Container) GetFuncSvc(ctx context.Context, fn *fv1.Function) (*fscache.FuncSvc, error) {
return caaf.createFunction(ctx, fn)
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/executortype/executortype.go
Expand Up @@ -48,6 +48,9 @@ type ExecutorType interface {
// UnTapService updates the isActive to false
UnTapService(ctx context.Context, key string, svcHost string)

// ReduceSpecializationInProgress updates the svcWaiting count in funcSvcGroup
MarkSpecializationFailure(ctx context.Context, key string)

// IsValid returns true if a function service is valid. Different executor types
// use distinct ways to examine the function service.
IsValid(context.Context, *fscache.FuncSvc) bool
Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/executortype/newdeploy/newdeploymgr.go
Expand Up @@ -199,6 +199,11 @@ func (deploy *NewDeploy) UnTapService(ctx context.Context, key string, svcHost s
// Not Implemented for NewDeployment. Will be used when support of concurrent specialization of same function is added.
}

// MarkSpecializationFailure has not been implemented for NewDeployment.
func (deploy *NewDeploy) MarkSpecializationFailure(ctx context.Context, key string) {
// Not Implemented for NewDeployment. Will be used when support of concurrent specialization of same function is added.
}

// TapService makes a TouchByAddress request to the cache.
func (deploy *NewDeploy) TapService(ctx context.Context, svcHost string) error {
otelUtils.SpanTrackEvent(ctx, "TapService")
Expand Down
16 changes: 14 additions & 2 deletions pkg/executor/executortype/poolmgr/gp.go
Expand Up @@ -226,6 +226,14 @@ func (gp *GenericPool) updateCPUUtilizationSvc(ctx context.Context) {
// returns the key and pod API object.
func (gp *GenericPool) choosePod(ctx context.Context, newLabels map[string]string) (string, *apiv1.Pod, error) {
startTime := time.Now()
podTimeout := startTime.Add(gp.podReadyTimeout)
deadline, ok := ctx.Deadline()
if ok {
deadline = deadline.Add(-1 * time.Second)
if deadline.Before(podTimeout) {
podTimeout = deadline
}
}
expoDelay := 100 * time.Millisecond
logger := otelUtils.LoggerWithTraceID(ctx, gp.logger)
if !cache.WaitForCacheSync(ctx.Done(), gp.readyPodListerSynced) {
Expand All @@ -234,10 +242,14 @@ func (gp *GenericPool) choosePod(ctx context.Context, newLabels map[string]strin
}
for {
// Retries took too long, error out.
if time.Since(startTime) > gp.podReadyTimeout {
logger.Error("timed out waiting for pod", zap.Any("labels", newLabels), zap.Duration("timeout", gp.podReadyTimeout))
if time.Now().After(podTimeout) {
logger.Error("timed out waiting for pod", zap.Any("labels", newLabels), zap.Duration("timeout", podTimeout.Sub(startTime)))
return "", nil, errors.New("timeout: waited too long to get a ready pod")
}
if ctx.Err() != nil {
logger.Error("context canceled while waiting for pod", zap.Any("labels", newLabels), zap.Duration("timeout", podTimeout.Sub(startTime)))
return "", nil, fmt.Errorf("context canceled while waiting for pod: %w", ctx.Err())
}

var chosenPod *apiv1.Pod
var key string
Expand Down
8 changes: 8 additions & 0 deletions pkg/executor/executortype/poolmgr/gpm.go
Expand Up @@ -252,6 +252,14 @@ func (gpm *GenericPoolManager) TapService(ctx context.Context, svcHost string) e
return nil
}

func (gpm *GenericPoolManager) MarkSpecializationFailure(ctx context.Context, key string) {
otelUtils.SpanTrackEvent(ctx, "MarkSpecializationFailure",
attribute.KeyValue{Key: "key", Value: attribute.StringValue(key)})
logger := otelUtils.LoggerWithTraceID(ctx, gpm.logger)
logger.Info("marking specialization failure", zap.Any("key", key))
gpm.fsCache.MarkSpecializationFailure(key)
}

// IsValid checks if pod is not deleted and that it has the address passed as the argument. Also checks that all the
// containers in it are reporting a ready status for the healthCheck.
func (gpm *GenericPoolManager) IsValid(ctx context.Context, fsvc *fscache.FuncSvc) bool {
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/fscache/functionServiceCache.go
Expand Up @@ -244,6 +244,10 @@ func (fsc *FunctionServiceCache) MarkAvailable(key string, svcHost string) {
fsc.connFunctionCache.MarkAvailable(key, svcHost)
}

func (fsc *FunctionServiceCache) MarkSpecializationFailure(key string) {
fsc.connFunctionCache.MarkSpecializationFailure(key)
}

// Add adds a function service to cache if it does not exist already.
func (fsc *FunctionServiceCache) Add(fsvc FuncSvc) (*FuncSvc, error) {
existing, err := fsc.byFunction.Set(crd.CacheKey(fsvc.Function), &fsvc)
Expand Down
18 changes: 18 additions & 0 deletions pkg/executor/fscache/poolcache.go
Expand Up @@ -36,6 +36,7 @@ const (
markAvailable
deleteValue
setCPUUtilization
markSpecializationFailure
)

type (
Expand Down Expand Up @@ -232,6 +233,14 @@ func (c *PoolCache) service() {
}
}
}
case markSpecializationFailure:
if c.cache[req.function].svcWaiting > c.cache[req.function].queue.Len() {
c.cache[req.function].svcWaiting--
if c.cache[req.function].svcWaiting == c.cache[req.function].queue.Len() {
expiredRequests := c.cache[req.function].queue.Expired()
c.cache[req.function].svcWaiting = c.cache[req.function].svcWaiting - expiredRequests
}
}
case deleteValue:
delete(c.cache[req.function].svcs, req.address)
req.responseChannel <- resp
Expand Down Expand Up @@ -328,3 +337,12 @@ func (c *PoolCache) DeleteValue(ctx context.Context, function, address string) e
resp := <-respChannel
return resp.error
}

// ReduceSpecializationInProgress reduces the svcWaiting count
func (c *PoolCache) MarkSpecializationFailure(function string) {
c.requestChannel <- &request{
requestType: markSpecializationFailure,
function: function,
responseChannel: make(chan *response),
}
}
25 changes: 25 additions & 0 deletions pkg/executor/fscache/queue.go
Expand Up @@ -38,6 +38,31 @@ func (q *Queue) Pop() *svcWait {
return svcWait
}

func (q *Queue) Expired() int {
q.mutex.Lock()
defer q.mutex.Unlock()

expired := 0
svcExpired := []*list.Element{}
for item := q.items.Front(); item != nil; item = item.Next() {
svcWait, ok := item.Value.(*svcWait)
if !ok {
continue
}
if svcWait.ctx.Err() != nil {
close(svcWait.svcChannel)
svcExpired = append(svcExpired, item)
expired = expired + 1
}
}

for _, item := range svcExpired {
q.items.Remove(item)
}

return expired
}

func (q *Queue) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
Expand Down
76 changes: 76 additions & 0 deletions pkg/executor/fscache/queue_test.go
@@ -1,6 +1,7 @@
package fscache

import (
"context"
"sync"
"testing"
)
Expand Down Expand Up @@ -113,3 +114,78 @@ func TestQueueLen(t *testing.T) {
t.Errorf("Expected queue length to be 1, got %d", q.Len())
}
}

func TestExpiredWhenAllItemsExpired(t *testing.T) {
q := NewQueue()
if q.Expired() != 0 {
t.Errorf("Expected Expired to return 0, got %d", q.Expired())
}
ctx, cancel := context.WithCancel(context.Background())
item := &svcWait{
svcChannel: make(chan *FuncSvc),
ctx: ctx,
}
q.Push(item)
if q.Len() != 1 {
t.Errorf("Expected queue length to be 1, got %d", q.Len())
}
cancel()
if q.Expired() != 1 {
t.Errorf("Expected Expired to return 1, got %d", q.Expired())
}
if q.Len() != 0 {
t.Errorf("Expected queue length to be 0, got %d", q.Len())
}
}

func TestExpiredWhenFewItemsExpired(t *testing.T) {
q := NewQueue()
if q.Expired() != 0 {
t.Errorf("Expected Expired to return 0, got %d", q.Expired())
}
ctx, cancel := context.WithCancel(context.Background())

q.Push(&svcWait{
svcChannel: make(chan *FuncSvc),
ctx: ctx,
})
q.Push(&svcWait{
svcChannel: make(chan *FuncSvc),
ctx: context.Background(),
})
if q.Len() != 2 {
t.Errorf("Expected queue length to be 1, got %d", q.Len())
}
cancel()
if q.Expired() != 1 {
t.Errorf("Expected Expired to return 1, got %d", q.Expired())
}
if q.Len() != 1 {
t.Errorf("Expected queue length to be 0, got %d", q.Len())
}
}

func TestExpiredWhenNoItemsExpired(t *testing.T) {
q := NewQueue()
if q.Expired() != 0 {
t.Errorf("Expected Expired to return 0, got %d", q.Expired())
}

q.Push(&svcWait{
svcChannel: make(chan *FuncSvc),
ctx: context.Background(),
})
q.Push(&svcWait{
svcChannel: make(chan *FuncSvc),
ctx: context.Background(),
})
if q.Len() != 2 {
t.Errorf("Expected queue length to be 1, got %d", q.Len())
}
if q.Expired() != 0 {
t.Errorf("Expected Expired to return 1, got %d", q.Expired())
}
if q.Len() != 2 {
t.Errorf("Expected queue length to be 0, got %d", q.Len())
}
}

0 comments on commit 6c431e4

Please sign in to comment.