From 267f7faf18309a72c984968e7b70d4aab35e5e50 Mon Sep 17 00:00:00 2001 From: Sanket Sudake Date: Sun, 29 Oct 2023 13:19:06 +0530 Subject: [PATCH] Add cause for all context timeouts (#2862) --- pkg/executor/executor.go | 8 ++++---- pkg/executor/reaper/reaper.go | 9 +++++---- pkg/fission-cli/cmd/function/test.go | 2 +- pkg/publisher/webhookPublisher.go | 3 ++- pkg/router/functionHandler.go | 6 +++--- pkg/tracker/tracker.go | 3 ++- 6 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 67f284aa5a..e2119bad04 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -126,8 +126,8 @@ func (executor *Executor) serveCreateFuncServices() { specializationTimeout = fv1.DefaultSpecializationTimeOut } - fnSpecializationTimeoutContext, cancel := context.WithTimeout(req.context, - time.Duration(specializationTimeout+buffer)*time.Second) + fnSpecializationTimeoutContext, cancel := context.WithTimeoutCause(req.context, + time.Duration(specializationTimeout+buffer)*time.Second, fmt.Errorf("function specialization timeout (%d)s exceeded", specializationTimeout+buffer)) defer cancel() fsvc, err := executor.createServiceForFunction(fnSpecializationTimeoutContext, req.function) @@ -169,8 +169,8 @@ func (executor *Executor) serveCreateFuncServices() { specializationTimeout = fv1.DefaultSpecializationTimeOut } - fnSpecializationTimeoutContext, cancel := context.WithTimeout(req.context, - time.Duration(specializationTimeout+buffer)*time.Second) + fnSpecializationTimeoutContext, cancel := context.WithTimeoutCause(req.context, + time.Duration(specializationTimeout+buffer)*time.Second, fmt.Errorf("function specialization timeout (%d)s exceeded", specializationTimeout+buffer)) defer cancel() fsvc, err := executor.createServiceForFunction(fnSpecializationTimeoutContext, req.function) diff --git a/pkg/executor/reaper/reaper.go b/pkg/executor/reaper/reaper.go index 3d97caceaa..66d4c139e1 100644 --- a/pkg/executor/reaper/reaper.go +++ b/pkg/executor/reaper/reaper.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" apiv1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -39,25 +40,25 @@ func CleanupKubeObject(ctx context.Context, logger *zap.Logger, kubeClient kuber switch strings.ToLower(kubeobj.Kind) { case "pod": err := kubeClient.CoreV1().Pods(kubeobj.Namespace).Delete(ctx, kubeobj.Name, metav1.DeleteOptions{}) - if err != nil { + if err != nil && !k8serrors.IsNotFound(err) { logger.Error("error cleaning up pod", zap.Error(err), zap.String("pod", kubeobj.Name)) } case "service": err := kubeClient.CoreV1().Services(kubeobj.Namespace).Delete(ctx, kubeobj.Name, metav1.DeleteOptions{}) - if err != nil { + if err != nil && !k8serrors.IsNotFound(err) { logger.Error("error cleaning up service", zap.Error(err), zap.String("service", kubeobj.Name)) } case "deployment": err := kubeClient.AppsV1().Deployments(kubeobj.Namespace).Delete(ctx, kubeobj.Name, delOpt) - if err != nil { + if err != nil && !k8serrors.IsNotFound(err) { logger.Error("error cleaning up deployment", zap.Error(err), zap.String("deployment", kubeobj.Name)) } case "horizontalpodautoscaler": err := kubeClient.AutoscalingV2().HorizontalPodAutoscalers(kubeobj.Namespace).Delete(ctx, kubeobj.Name, metav1.DeleteOptions{}) - if err != nil { + if err != nil && !k8serrors.IsNotFound(err) { logger.Error("error cleaning up horizontalpodautoscaler", zap.Error(err), zap.String("horizontalpodautoscaler", kubeobj.Name)) } diff --git a/pkg/fission-cli/cmd/function/test.go b/pkg/fission-cli/cmd/function/test.go index bacd61ef8f..194275b28c 100644 --- a/pkg/fission-cli/cmd/function/test.go +++ b/pkg/fission-cli/cmd/function/test.go @@ -131,7 +131,7 @@ func (opts *TestSubCommand) do(input cli.Input) error { ctx = input.Context() } else { var closeCtx context.CancelFunc - ctx, closeCtx = context.WithTimeout(input.Context(), reqTimeout*time.Second) + ctx, closeCtx = context.WithTimeoutCause(input.Context(), reqTimeout*time.Second, fmt.Errorf("function request timeout (%d)s exceeded", reqTimeout)) defer closeCtx() } diff --git a/pkg/publisher/webhookPublisher.go b/pkg/publisher/webhookPublisher.go index 79c7d4b6cb..513a988348 100644 --- a/pkg/publisher/webhookPublisher.go +++ b/pkg/publisher/webhookPublisher.go @@ -19,6 +19,7 @@ package publisher import ( "bytes" "context" + "fmt" "io" "net/http" "strings" @@ -121,7 +122,7 @@ func (p *WebhookPublisher) makeHTTPRequest(r *publishRequest) { req.Header.Set(k, v) } // Make the request - ctx, cancel := context.WithTimeout(r.ctx, p.timeout) + ctx, cancel := context.WithTimeoutCause(r.ctx, p.timeout, fmt.Errorf("webhook request timed out (%f)s exceeded ", p.timeout.Seconds())) defer cancel() resp, err := ctxhttp.Do(ctx, otelhttp.DefaultClient, req) if err != nil { diff --git a/pkg/router/functionHandler.go b/pkg/router/functionHandler.go index a48b80b0ed..c783d3333a 100644 --- a/pkg/router/functionHandler.go +++ b/pkg/router/functionHandler.go @@ -415,7 +415,7 @@ func (roundTripper *RetryingRoundTripper) setContext(req *http.Request) *http.Re // that user aborts connection before timeout. Otherwise, // the request won't be canceled until the deadline exceeded // which may be a potential security issue. - ctx, closeCtx := context.WithTimeout(req.Context(), roundTripper.funcTimeout) + ctx, closeCtx := context.WithTimeoutCause(req.Context(), roundTripper.funcTimeout, fmt.Errorf("roundtripper timeout (%f)s exceeded", roundTripper.funcTimeout.Seconds())) roundTripper.closeContextFunc = &closeCtx return req.WithContext(ctx) @@ -581,7 +581,7 @@ func (roundTripper RetryingRoundTripper) addForwardedHostHeader(req *http.Reques // unTapservice marks the serviceURL in executor's cache as inactive, so that it can be reused func (fh functionHandler) unTapService(ctx context.Context, fn *fv1.Function, serviceUrl *url.URL) error { fh.logger.Debug("UnTapService Called") - ctx, cancel := context.WithTimeout(ctx, fh.unTapServiceTimeout) + ctx, cancel := context.WithTimeoutCause(ctx, fh.unTapServiceTimeout, fmt.Errorf("unTapService timeout (%f)s exceeded", fh.unTapServiceTimeout.Seconds())) defer cancel() err := fh.executor.UnTapService(ctx, fn.ObjectMeta, fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType, serviceUrl) if err != nil { @@ -640,7 +640,7 @@ func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serv var fContext context.Context if fh.function.Spec.FunctionTimeout > 0 { timeout := time.Second * time.Duration(fh.function.Spec.FunctionTimeout) - f, cancel := context.WithTimeout(ctx, timeout) + f, cancel := context.WithTimeoutCause(ctx, timeout, fmt.Errorf("function service entry timeout (%f)s exceeded", timeout.Seconds())) fContext = f defer cancel() } else { diff --git a/pkg/tracker/tracker.go b/pkg/tracker/tracker.go index 301a7769b4..fea11cd19a 100644 --- a/pkg/tracker/tracker.go +++ b/pkg/tracker/tracker.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "errors" + "fmt" "net/http" "net/url" "os" @@ -97,7 +98,7 @@ func (t *Tracker) SendEvent(ctx context.Context, e Event) error { if err != nil { return err } - ctx, cancel := context.WithTimeout(req.Context(), HTTP_TIMEOUT) + ctx, cancel := context.WithTimeoutCause(req.Context(), HTTP_TIMEOUT, fmt.Errorf("tracker request timeout (%f)s exceeded", HTTP_TIMEOUT.Seconds())) defer cancel() req = req.WithContext(ctx)