Skip to content

Commit

Permalink
feat: add trace and timeout support for the timer (#2750)
Browse files Browse the repository at this point in the history
* feat: add trace and timeout support for the timer

Co-authored-by: gw123 <iamakillerforyou@gmail.com>
Signed-off-by: saltbo <saltbo@foxmail.com>

* fix: add error check for the lint

Signed-off-by: saltbo <saltbo@foxmail.com>

---------

Signed-off-by: saltbo <saltbo@foxmail.com>
Co-authored-by: gw123 <iamakillerforyou@gmail.com>
  • Loading branch information
saltbo and gw123 committed Mar 29, 2023
1 parent 1f138d0 commit 2213ebc
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/kubewatcher/kubewatcher.go
Expand Up @@ -268,7 +268,7 @@ func (ws *watchSubscription) eventDispatchLoop(ctx context.Context) {
// the triggers can only be created in the same namespace as the function.
// so essentially, function namespace = trigger namespace.
url := utils.UrlForFunction(ws.watch.Spec.FunctionReference.Name, ws.watch.ObjectMeta.Namespace)
ws.publisher.Publish(buf.String(), headers, url)
ws.publisher.Publish(ctx, buf.String(), headers, url)
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/publisher/publisher.go
Expand Up @@ -16,13 +16,15 @@ limitations under the License.

package publisher

import "context"

type (
// Publisher interface wraps the Publish method that publishes an request
// with given "body" and "headers" to given "target"
Publisher interface {
// Publish an request to a "target". Target's meaning depends on the
// Publish a request to a "target". Target's meaning depends on the
// publisher: it's a URL in the case of a webhook publisher, or a queue
// name in a queue-based publisher such as NATS.
Publish(body string, headers map[string]string, target string)
Publish(ctx context.Context, body string, headers map[string]string, target string)
}
)
34 changes: 34 additions & 0 deletions pkg/publisher/publisher_test.go
@@ -0,0 +1,34 @@
package publisher

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/fission/fission/pkg/utils/loggerfactory"
otelUtils "github.com/fission/fission/pkg/utils/otel"
"github.com/stretchr/testify/assert"
)

func TestPublisher(t *testing.T) {
fnName := "test-fn"
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/"+fnName, r.URL.Path)
assert.Equal(t, "aaa", r.Header.Get("X-Fission-Test"))
assert.Contains(t, r.Header, "Traceparent")
}))

ctx := context.Background()
logger := loggerfactory.GetLogger()
shutdown, err := otelUtils.InitProvider(ctx, logger, fnName)
assert.NoError(t, err)
if shutdown != nil {
defer shutdown(ctx)
}

wp := MakeWebhookPublisher(logger, s.URL)
wp.Publish(ctx, "", map[string]string{"X-Fission-Test": "aaa"}, fnName)
time.Sleep(time.Second * 1)
}
22 changes: 19 additions & 3 deletions pkg/publisher/webhookPublisher.go
Expand Up @@ -18,12 +18,17 @@ package publisher

import (
"bytes"
"context"
"io"
"net/http"
"strings"
"time"

otelUtils "github.com/fission/fission/pkg/utils/otel"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"golang.org/x/net/context/ctxhttp"
)

type (
Expand All @@ -37,8 +42,10 @@ type (
retryDelay time.Duration

baseURL string
timeout time.Duration
}
publishRequest struct {
ctx context.Context
body string
headers map[string]string
target string
Expand All @@ -54,6 +61,8 @@ func MakeWebhookPublisher(logger *zap.Logger, baseURL string) *WebhookPublisher
baseURL: baseURL,
requestChannel: make(chan *publishRequest, 32), // buffered channel
// TODO make this configurable
timeout: 60 * time.Minute,
// TODO make this configurable
maxRetries: 10,
retryDelay: 500 * time.Millisecond,
}
Expand All @@ -62,9 +71,14 @@ func MakeWebhookPublisher(logger *zap.Logger, baseURL string) *WebhookPublisher
}

// Publish sends a request to the target with payload having given body and headers
func (p *WebhookPublisher) Publish(body string, headers map[string]string, target string) {
func (p *WebhookPublisher) Publish(ctx context.Context, body string, headers map[string]string, target string) {
tracer := otel.Tracer("WebhookPublisher")
ctx, span := tracer.Start(ctx, "WebhookPublisher/Publish")
defer span.End()

// serializing the request gives user a guarantee that the request is sent in sequence order
p.requestChannel <- &publishRequest{
ctx: ctx,
body: body,
headers: headers,
target: target,
Expand All @@ -89,7 +103,7 @@ func (p *WebhookPublisher) makeHTTPRequest(r *publishRequest) {

// log once for this request
defer func() {
if ce := p.logger.Check(level, msg); ce != nil {
if ce := otelUtils.LoggerWithTraceID(r.ctx, p.logger).Check(level, msg); ce != nil {
ce.Write(fields...)
}
}()
Expand All @@ -107,7 +121,9 @@ func (p *WebhookPublisher) makeHTTPRequest(r *publishRequest) {
req.Header.Set(k, v)
}
// Make the request
resp, err := http.DefaultClient.Do(req)
ctx, cancel := context.WithTimeout(r.ctx, p.timeout)
defer cancel()
resp, err := ctxhttp.Do(ctx, otelhttp.DefaultClient, req)
if err != nil {
fields = append(fields, zap.Error(err), zap.Any("request", r))
} else {
Expand Down
4 changes: 3 additions & 1 deletion pkg/timer/timer.go
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package timer

import (
"context"

"github.com/robfig/cron/v3"
"go.uber.org/zap"

Expand Down Expand Up @@ -66,7 +68,7 @@ func (timer *Timer) newCron(t fv1.TimeTrigger) *cron.Cron {
// with the addition of multi-tenancy, the users can create functions in any namespace. however,
// the triggers can only be created in the same namespace as the function.
// so essentially, function namespace = trigger namespace.
(*timer.publisher).Publish("", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.Namespace))
(*timer.publisher).Publish(context.Background(), "", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.Namespace))
})
c.Start()
timer.logger.Info("started cron for time trigger", zap.String("trigger_name", t.Name), zap.String("trigger_namespace", t.Namespace), zap.String("cron", t.Spec.Cron))
Expand Down

0 comments on commit 2213ebc

Please sign in to comment.