Skip to content

Commit

Permalink
Add cause for all context timeouts (#2862)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketsudake committed Oct 29, 2023
1 parent 2223081 commit 267f7fa
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 14 deletions.
8 changes: 4 additions & 4 deletions pkg/executor/executor.go
Expand Up @@ -126,8 +126,8 @@ func (executor *Executor) serveCreateFuncServices() {
specializationTimeout = fv1.DefaultSpecializationTimeOut
}

fnSpecializationTimeoutContext, cancel := context.WithTimeout(req.context,
time.Duration(specializationTimeout+buffer)*time.Second)
fnSpecializationTimeoutContext, cancel := context.WithTimeoutCause(req.context,
time.Duration(specializationTimeout+buffer)*time.Second, fmt.Errorf("function specialization timeout (%d)s exceeded", specializationTimeout+buffer))
defer cancel()

fsvc, err := executor.createServiceForFunction(fnSpecializationTimeoutContext, req.function)
Expand Down Expand Up @@ -169,8 +169,8 @@ func (executor *Executor) serveCreateFuncServices() {
specializationTimeout = fv1.DefaultSpecializationTimeOut
}

fnSpecializationTimeoutContext, cancel := context.WithTimeout(req.context,
time.Duration(specializationTimeout+buffer)*time.Second)
fnSpecializationTimeoutContext, cancel := context.WithTimeoutCause(req.context,
time.Duration(specializationTimeout+buffer)*time.Second, fmt.Errorf("function specialization timeout (%d)s exceeded", specializationTimeout+buffer))
defer cancel()

fsvc, err := executor.createServiceForFunction(fnSpecializationTimeoutContext, req.function)
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/reaper/reaper.go
Expand Up @@ -22,6 +22,7 @@ import (

"go.uber.org/zap"
apiv1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand All @@ -39,25 +40,25 @@ func CleanupKubeObject(ctx context.Context, logger *zap.Logger, kubeClient kuber
switch strings.ToLower(kubeobj.Kind) {
case "pod":
err := kubeClient.CoreV1().Pods(kubeobj.Namespace).Delete(ctx, kubeobj.Name, metav1.DeleteOptions{})
if err != nil {
if err != nil && !k8serrors.IsNotFound(err) {
logger.Error("error cleaning up pod", zap.Error(err), zap.String("pod", kubeobj.Name))
}

case "service":
err := kubeClient.CoreV1().Services(kubeobj.Namespace).Delete(ctx, kubeobj.Name, metav1.DeleteOptions{})
if err != nil {
if err != nil && !k8serrors.IsNotFound(err) {
logger.Error("error cleaning up service", zap.Error(err), zap.String("service", kubeobj.Name))
}

case "deployment":
err := kubeClient.AppsV1().Deployments(kubeobj.Namespace).Delete(ctx, kubeobj.Name, delOpt)
if err != nil {
if err != nil && !k8serrors.IsNotFound(err) {
logger.Error("error cleaning up deployment", zap.Error(err), zap.String("deployment", kubeobj.Name))
}

case "horizontalpodautoscaler":
err := kubeClient.AutoscalingV2().HorizontalPodAutoscalers(kubeobj.Namespace).Delete(ctx, kubeobj.Name, metav1.DeleteOptions{})
if err != nil {
if err != nil && !k8serrors.IsNotFound(err) {
logger.Error("error cleaning up horizontalpodautoscaler", zap.Error(err), zap.String("horizontalpodautoscaler", kubeobj.Name))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/fission-cli/cmd/function/test.go
Expand Up @@ -131,7 +131,7 @@ func (opts *TestSubCommand) do(input cli.Input) error {
ctx = input.Context()
} else {
var closeCtx context.CancelFunc
ctx, closeCtx = context.WithTimeout(input.Context(), reqTimeout*time.Second)
ctx, closeCtx = context.WithTimeoutCause(input.Context(), reqTimeout*time.Second, fmt.Errorf("function request timeout (%d)s exceeded", reqTimeout))
defer closeCtx()
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/publisher/webhookPublisher.go
Expand Up @@ -19,6 +19,7 @@ package publisher
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (p *WebhookPublisher) makeHTTPRequest(r *publishRequest) {
req.Header.Set(k, v)
}
// Make the request
ctx, cancel := context.WithTimeout(r.ctx, p.timeout)
ctx, cancel := context.WithTimeoutCause(r.ctx, p.timeout, fmt.Errorf("webhook request timed out (%f)s exceeded ", p.timeout.Seconds()))
defer cancel()
resp, err := ctxhttp.Do(ctx, otelhttp.DefaultClient, req)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/router/functionHandler.go
Expand Up @@ -415,7 +415,7 @@ func (roundTripper *RetryingRoundTripper) setContext(req *http.Request) *http.Re
// that user aborts connection before timeout. Otherwise,
// the request won't be canceled until the deadline exceeded
// which may be a potential security issue.
ctx, closeCtx := context.WithTimeout(req.Context(), roundTripper.funcTimeout)
ctx, closeCtx := context.WithTimeoutCause(req.Context(), roundTripper.funcTimeout, fmt.Errorf("roundtripper timeout (%f)s exceeded", roundTripper.funcTimeout.Seconds()))
roundTripper.closeContextFunc = &closeCtx

return req.WithContext(ctx)
Expand Down Expand Up @@ -581,7 +581,7 @@ func (roundTripper RetryingRoundTripper) addForwardedHostHeader(req *http.Reques
// unTapservice marks the serviceURL in executor's cache as inactive, so that it can be reused
func (fh functionHandler) unTapService(ctx context.Context, fn *fv1.Function, serviceUrl *url.URL) error {
fh.logger.Debug("UnTapService Called")
ctx, cancel := context.WithTimeout(ctx, fh.unTapServiceTimeout)
ctx, cancel := context.WithTimeoutCause(ctx, fh.unTapServiceTimeout, fmt.Errorf("unTapService timeout (%f)s exceeded", fh.unTapServiceTimeout.Seconds()))
defer cancel()
err := fh.executor.UnTapService(ctx, fn.ObjectMeta, fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType, serviceUrl)
if err != nil {
Expand Down Expand Up @@ -640,7 +640,7 @@ func (fh functionHandler) getServiceEntryFromExecutor(ctx context.Context) (serv
var fContext context.Context
if fh.function.Spec.FunctionTimeout > 0 {
timeout := time.Second * time.Duration(fh.function.Spec.FunctionTimeout)
f, cancel := context.WithTimeout(ctx, timeout)
f, cancel := context.WithTimeoutCause(ctx, timeout, fmt.Errorf("function service entry timeout (%f)s exceeded", timeout.Seconds()))
fContext = f
defer cancel()
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/tracker/tracker.go
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (t *Tracker) SendEvent(ctx context.Context, e Event) error {
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(req.Context(), HTTP_TIMEOUT)
ctx, cancel := context.WithTimeoutCause(req.Context(), HTTP_TIMEOUT, fmt.Errorf("tracker request timeout (%f)s exceeded", HTTP_TIMEOUT.Seconds()))
defer cancel()

req = req.WithContext(ctx)
Expand Down

0 comments on commit 267f7fa

Please sign in to comment.