From 2213ebc637f16b8c596db21ba12a3614cbeab925 Mon Sep 17 00:00:00 2001 From: Ambor Date: Wed, 29 Mar 2023 11:57:55 +0800 Subject: [PATCH] feat: add trace and timeout support for the timer (#2750) * feat: add trace and timeout support for the timer Co-authored-by: gw123 Signed-off-by: saltbo * fix: add error check for the lint Signed-off-by: saltbo --------- Signed-off-by: saltbo Co-authored-by: gw123 --- pkg/kubewatcher/kubewatcher.go | 2 +- pkg/publisher/publisher.go | 6 ++++-- pkg/publisher/publisher_test.go | 34 +++++++++++++++++++++++++++++++ pkg/publisher/webhookPublisher.go | 22 +++++++++++++++++--- pkg/timer/timer.go | 4 +++- 5 files changed, 61 insertions(+), 7 deletions(-) create mode 100644 pkg/publisher/publisher_test.go diff --git a/pkg/kubewatcher/kubewatcher.go b/pkg/kubewatcher/kubewatcher.go index 80af2d0e15..297801f168 100644 --- a/pkg/kubewatcher/kubewatcher.go +++ b/pkg/kubewatcher/kubewatcher.go @@ -268,7 +268,7 @@ func (ws *watchSubscription) eventDispatchLoop(ctx context.Context) { // the triggers can only be created in the same namespace as the function. // so essentially, function namespace = trigger namespace. url := utils.UrlForFunction(ws.watch.Spec.FunctionReference.Name, ws.watch.ObjectMeta.Namespace) - ws.publisher.Publish(buf.String(), headers, url) + ws.publisher.Publish(ctx, buf.String(), headers, url) } } diff --git a/pkg/publisher/publisher.go b/pkg/publisher/publisher.go index 918963a896..96597614a7 100644 --- a/pkg/publisher/publisher.go +++ b/pkg/publisher/publisher.go @@ -16,13 +16,15 @@ limitations under the License. package publisher +import "context" + type ( // Publisher interface wraps the Publish method that publishes an request // with given "body" and "headers" to given "target" Publisher interface { - // Publish an request to a "target". Target's meaning depends on the + // Publish a request to a "target". Target's meaning depends on the // publisher: it's a URL in the case of a webhook publisher, or a queue // name in a queue-based publisher such as NATS. - Publish(body string, headers map[string]string, target string) + Publish(ctx context.Context, body string, headers map[string]string, target string) } ) diff --git a/pkg/publisher/publisher_test.go b/pkg/publisher/publisher_test.go new file mode 100644 index 0000000000..3f27a58c00 --- /dev/null +++ b/pkg/publisher/publisher_test.go @@ -0,0 +1,34 @@ +package publisher + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/fission/fission/pkg/utils/loggerfactory" + otelUtils "github.com/fission/fission/pkg/utils/otel" + "github.com/stretchr/testify/assert" +) + +func TestPublisher(t *testing.T) { + fnName := "test-fn" + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/"+fnName, r.URL.Path) + assert.Equal(t, "aaa", r.Header.Get("X-Fission-Test")) + assert.Contains(t, r.Header, "Traceparent") + })) + + ctx := context.Background() + logger := loggerfactory.GetLogger() + shutdown, err := otelUtils.InitProvider(ctx, logger, fnName) + assert.NoError(t, err) + if shutdown != nil { + defer shutdown(ctx) + } + + wp := MakeWebhookPublisher(logger, s.URL) + wp.Publish(ctx, "", map[string]string{"X-Fission-Test": "aaa"}, fnName) + time.Sleep(time.Second * 1) +} diff --git a/pkg/publisher/webhookPublisher.go b/pkg/publisher/webhookPublisher.go index 0123e7ba1a..79c7d4b6cb 100644 --- a/pkg/publisher/webhookPublisher.go +++ b/pkg/publisher/webhookPublisher.go @@ -18,12 +18,17 @@ package publisher import ( "bytes" + "context" "io" "net/http" "strings" "time" + otelUtils "github.com/fission/fission/pkg/utils/otel" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" "go.uber.org/zap" + "golang.org/x/net/context/ctxhttp" ) type ( @@ -37,8 +42,10 @@ type ( retryDelay time.Duration baseURL string + timeout time.Duration } publishRequest struct { + ctx context.Context body string headers map[string]string target string @@ -54,6 +61,8 @@ func MakeWebhookPublisher(logger *zap.Logger, baseURL string) *WebhookPublisher baseURL: baseURL, requestChannel: make(chan *publishRequest, 32), // buffered channel // TODO make this configurable + timeout: 60 * time.Minute, + // TODO make this configurable maxRetries: 10, retryDelay: 500 * time.Millisecond, } @@ -62,9 +71,14 @@ func MakeWebhookPublisher(logger *zap.Logger, baseURL string) *WebhookPublisher } // Publish sends a request to the target with payload having given body and headers -func (p *WebhookPublisher) Publish(body string, headers map[string]string, target string) { +func (p *WebhookPublisher) Publish(ctx context.Context, body string, headers map[string]string, target string) { + tracer := otel.Tracer("WebhookPublisher") + ctx, span := tracer.Start(ctx, "WebhookPublisher/Publish") + defer span.End() + // serializing the request gives user a guarantee that the request is sent in sequence order p.requestChannel <- &publishRequest{ + ctx: ctx, body: body, headers: headers, target: target, @@ -89,7 +103,7 @@ func (p *WebhookPublisher) makeHTTPRequest(r *publishRequest) { // log once for this request defer func() { - if ce := p.logger.Check(level, msg); ce != nil { + if ce := otelUtils.LoggerWithTraceID(r.ctx, p.logger).Check(level, msg); ce != nil { ce.Write(fields...) } }() @@ -107,7 +121,9 @@ func (p *WebhookPublisher) makeHTTPRequest(r *publishRequest) { req.Header.Set(k, v) } // Make the request - resp, err := http.DefaultClient.Do(req) + ctx, cancel := context.WithTimeout(r.ctx, p.timeout) + defer cancel() + resp, err := ctxhttp.Do(ctx, otelhttp.DefaultClient, req) if err != nil { fields = append(fields, zap.Error(err), zap.Any("request", r)) } else { diff --git a/pkg/timer/timer.go b/pkg/timer/timer.go index 123d3645e8..503dc8241d 100644 --- a/pkg/timer/timer.go +++ b/pkg/timer/timer.go @@ -17,6 +17,8 @@ limitations under the License. package timer import ( + "context" + "github.com/robfig/cron/v3" "go.uber.org/zap" @@ -66,7 +68,7 @@ func (timer *Timer) newCron(t fv1.TimeTrigger) *cron.Cron { // with the addition of multi-tenancy, the users can create functions in any namespace. however, // the triggers can only be created in the same namespace as the function. // so essentially, function namespace = trigger namespace. - (*timer.publisher).Publish("", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.Namespace)) + (*timer.publisher).Publish(context.Background(), "", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.Namespace)) }) c.Start() timer.logger.Info("started cron for time trigger", zap.String("trigger_name", t.Name), zap.String("trigger_namespace", t.Namespace), zap.String("cron", t.Spec.Cron))