Skip to content

Commit

Permalink
Adding a way to inject a request ID (#1046)
Browse files Browse the repository at this point in the history
* Adding a way to inject a request ID

It is very useful to associate a request ID to each incoming request,
this change allows to provide a function to do that via Server Option.
The change comes with a default function which will generate a new
request ID. The request ID is put in the request context along with a
common logger which always logs the request-id

We add gRPC interceptors to the server so it can get the request ID out
of the gRPC metadata and put it in the common logger stored in the
context so as all the log lines using the common logger from the context
will have the request ID logged
  • Loading branch information
reclaro committed Jun 14, 2018
1 parent 3790d34 commit e637661
Show file tree
Hide file tree
Showing 133 changed files with 10,665 additions and 14 deletions.
18 changes: 18 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,4 @@ ignored = ["github.com/fnproject/fn/cli",
[[constraint]]
name = "github.com/dchest/siphash"
version = "1.1.0"

29 changes: 17 additions & 12 deletions api/agent/pure_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/fnext"
"github.com/fnproject/fn/grpcutil"
"github.com/go-openapi/strfmt"
"github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -206,6 +207,7 @@ func (ch *callHandle) enqueueCallResponse(err error) {
var details string
var errCode int
var errStr string
log := common.Logger(ch.ctx)

if err != nil {
errCode = models.GetAPIErrorCode(err)
Expand All @@ -215,8 +217,7 @@ func (ch *callHandle) enqueueCallResponse(err error) {
if ch.c != nil {
details = ch.c.Model().ID
}

common.Logger(ch.ctx).Debugf("Sending Call Finish details=%v", details)
log.Debugf("Sending Call Finish details=%v", details)

errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Expand All @@ -227,13 +228,13 @@ func (ch *callHandle) enqueueCallResponse(err error) {
}}})

if errTmp != nil {
common.Logger(ch.ctx).WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
log.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
return
}

errTmp = ch.finalize()
if errTmp != nil {
common.Logger(ch.ctx).WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
log.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
}
}

Expand Down Expand Up @@ -553,22 +554,21 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
// Handles a client engagement
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
grpc.EnableTracing = false

ctx := engagement.Context()
log := common.Logger(ctx)
// Keep lightweight tabs on what this runner is doing: for draindown tests
atomic.AddInt32(&pr.inflight, 1)
defer atomic.AddInt32(&pr.inflight, -1)

log := common.Logger(engagement.Context())
pv, ok := peer.FromContext(engagement.Context())
pv, ok := peer.FromContext(ctx)
log.Debug("Starting engagement")
if ok {
log.Debug("Peer is ", pv)
}
md, ok := metadata.FromIncomingContext(engagement.Context())
md, ok := metadata.FromIncomingContext(ctx)
if ok {
log.Debug("MD is ", md)
}

state := NewCallHandle(engagement)

tryMsg := state.getTryMsg()
Expand Down Expand Up @@ -713,11 +713,16 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials

func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) {
var srv *grpc.Server
var opts []grpc.ServerOption

sInterceptor := grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor)
uInterceptor := grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor)
opts = append(opts, sInterceptor)
opts = append(opts, uInterceptor)
if creds != nil {
srv = grpc.NewServer(grpc.Creds(creds))
} else {
srv = grpc.NewServer()
opts = append(opts, grpc.Creds(creds))
}
srv = grpc.NewServer(opts...)

pr := &pureRunner{
gRPCServer: srv,
Expand Down
7 changes: 7 additions & 0 deletions api/agent/runner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

pb "github.com/fnproject/fn/api/agent/grpc"
Expand Down Expand Up @@ -118,6 +119,12 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
return true, err
}

rid := common.RequestIDFromContext(ctx)
if rid != "" {
// Create a new gRPC metadata where we store the request ID
mp := metadata.Pairs(common.RequestIDContextKey, rid)
ctx = metadata.NewOutgoingContext(ctx, mp)
}
runnerConnection, err := r.client.Engage(ctx)
if err != nil {
log.WithError(err).Error("Unable to create client to runner node")
Expand Down
8 changes: 8 additions & 0 deletions api/common/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (

type contextKey string

// RequestIDContextKey is the name of the key used to store the request ID into the context
const RequestIDContextKey = "fn_request_id"

//WithRequestID stores a request ID into the context
func WithRequestID(ctx context.Context, rid string) context.Context {
return context.WithValue(ctx, contextKey(RequestIDContextKey), rid)
}

// WithLogger stores the logger.
func WithLogger(ctx context.Context, l logrus.FieldLogger) context.Context {
return context.WithValue(ctx, contextKey("logger"), l)
Expand Down
21 changes: 21 additions & 0 deletions api/common/request_id_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package common

import (
"context"

"github.com/fnproject/fn/api/id"
)

// FnRequestID returns the passed value if that is not empty otherwise it generates a new unique ID
func FnRequestID(ridFound string) string {
if ridFound == "" {
return id.New().String()
}
return ridFound
}

//RequestIDFromContext extract the request id from the context
func RequestIDFromContext(ctx context.Context) string {
rid, _ := ctx.Value(contextKey(RequestIDContextKey)).(string)
return rid
}
2 changes: 2 additions & 0 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
EnvCert = "FN_NODE_CERT"
EnvCertKey = "FN_NODE_CERT_KEY"
EnvCertAuth = "FN_NODE_CERT_AUTHORITY"
// The header name of the incoming request which holds the request ID
EnvRidHeader = "FN_RID_HEADER"

EnvProcessCollectorList = "FN_PROCESS_COLLECTOR_LIST"
EnvLBPlacementAlg = "FN_PLACER"
Expand Down
27 changes: 27 additions & 0 deletions api/server/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,38 @@ import (
"fmt"
"net/http"

"github.com/fnproject/fn/api/common"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)

type ServerOption func(context.Context, *Server) error

//RIDProvider is used to manage request ID
type RIDProvider struct {
HeaderName string //The name of the header where the reques id is stored in the incoming request
RIDGenerator func(string) string // Function to generate the requestID
}

func WithRIDProvider(ridProvider *RIDProvider) ServerOption {
return func(ctx context.Context, s *Server) error {
s.Router.Use(withRIDProvider(ridProvider))
return nil
}
}

func withRIDProvider(ridp *RIDProvider) func(c *gin.Context) {
return func(c *gin.Context) {
rid := ridp.RIDGenerator(c.Request.Header.Get(ridp.HeaderName))
ctx := common.WithRequestID(c.Request.Context(), rid)
// We set the rid in the common logger so it is always logged when the common logger is used
l := common.Logger(ctx).WithFields(logrus.Fields{common.RequestIDContextKey: rid})
ctx = common.WithLogger(ctx, l)
c.Request = c.Request.WithContext(ctx)
c.Next()
}
}

func EnableShutdownEndpoint(ctx context.Context, halt context.CancelFunc) ServerOption {
return func(ctx context.Context, s *Server) error {
s.Router.GET("/shutdown", s.handleShutdown(halt))
Expand Down
4 changes: 4 additions & 0 deletions cmd/fnserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/fnproject/fn/api/agent/drivers/docker"
"github.com/fnproject/fn/api/logs/s3"
"github.com/fnproject/fn/api/server"
// The trace package is imported in several places by different dependencies and if we don't import explicity here it is
// initialized every time it is imported and that creates a panic at run time as we register multiple time the handler for
// /debug/requests. For example see: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/663 and https://github.com/bradleyfalzon/gopherci/issues/101
_ "golang.org/x/net/trace"
// EXTENSIONS: Add extension imports here or use `fn build-server`. Learn more: https://github.com/fnproject/fn/blob/master/docs/operating/extending.md

_ "github.com/fnproject/fn/api/server/defaultexts"
Expand Down
36 changes: 34 additions & 2 deletions grpcutil/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (
"time"

"github.com/fnproject/fn/api/common"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

// DialWithBackoff creates a grpc connection using backoff strategy for reconnections
func DialWithBackoff(ctx context.Context, address string, creds credentials.TransportCredentials, timeout time.Duration, backoffCfg grpc.BackoffConfig) (*grpc.ClientConn, error) {
return dial(ctx, address, creds, timeout, grpc.WithBackoffConfig(backoffCfg))
func DialWithBackoff(ctx context.Context, address string, creds credentials.TransportCredentials, timeout time.Duration, backoffCfg grpc.BackoffConfig, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithBackoffConfig(backoffCfg))
return dial(ctx, address, creds, timeout, opts...)
}

// uses grpc connection backoff protocol https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
Expand Down Expand Up @@ -81,3 +85,31 @@ func CreateCredentials(certPath, keyPath, caCertPath, certCommonName string) (cr
RootCAs: certPool,
}), nil
}

// RIDStreamServerInterceptor is a gRPC stream interceptor which gets the request ID out of the context and put a logger with request ID logged into the common logger in the context
func RIDStreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
newStream := grpc_middleware.WrapServerStream(stream)
rid := ridFromMetadata(stream.Context())
if rid != "" {
newStream.WrappedContext, _ = common.LoggerWithFields(newStream.WrappedContext, logrus.Fields{common.RequestIDContextKey: rid})
}
return handler(srv, newStream)
}

// RIDUnaryServerInterceptor is an unary gRPC interceptor which gets the request ID out of the context and put a logger with request ID logged into the common logger in the context
func RIDUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
rid := ridFromMetadata(ctx)
if rid != "" {
ctx, _ = common.LoggerWithFields(ctx, logrus.Fields{common.RequestIDContextKey: rid})
}
return handler(ctx, req)
}

func ridFromMetadata(ctx context.Context) string {
rid := ""
md, ok := metadata.FromIncomingContext(ctx)
if ok && len(md[common.RequestIDContextKey]) > 0 {
rid = md[common.RequestIDContextKey][0]
}
return rid
}
33 changes: 33 additions & 0 deletions grpcutil/dial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package grpcutil

import (
"context"
"testing"

"github.com/fnproject/fn/api/common"
"google.golang.org/grpc/metadata"
)

func TestRIDFoundInMetadata(t *testing.T) {
expected := "request-id-test"
ctx := context.Background()
m := make(map[string]string)
m[common.RequestIDContextKey] = expected
md := metadata.New(m)
incomingCtx := metadata.NewIncomingContext(ctx, md)
actual := ridFromMetadata(incomingCtx)
if actual != expected {
t.Fatalf("Wrong request ID expected '%s' got '%s'", expected, actual)
}
}

func TestRIDNotFoundInMetadata(t *testing.T) {
ctx := context.Background()
m := make(map[string]string)
md := metadata.New(m)
incomingCtx := metadata.NewIncomingContext(ctx, md)
actual := ridFromMetadata(incomingCtx)
if actual != "" {
t.Fatalf("Expected empty request ID got '%s'", actual)
}
}
6 changes: 6 additions & 0 deletions test/fn-system-tests/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/agent/hybrid"
"github.com/fnproject/fn/api/common"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/api/server"
_ "github.com/fnproject/fn/api/server/defaultexts"
Expand Down Expand Up @@ -197,6 +198,11 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) {
opts = append(opts, server.WithMQURL(""))
opts = append(opts, server.WithLogURL(""))
opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly
ridProvider := &server.RIDProvider{
HeaderName: "fn_request_id",
RIDGenerator: common.FnRequestID,
}
opts = append(opts, server.WithRIDProvider(ridProvider))
opts = append(opts, server.WithPrometheus())

apiURL := "http://127.0.0.1:8085"
Expand Down
Loading

0 comments on commit e637661

Please sign in to comment.