From d55c9e477a9e24857b8eb1ec782f63902cf24d5d Mon Sep 17 00:00:00 2001 From: madhuravi Date: Fri, 18 Nov 2016 12:49:31 -0800 Subject: [PATCH] Added filter chain for HTTP module and filters for tracer and error handling (#71) * Added filter chain for HTTP module and filters for tracer and error * Added http test to check that tracer has been set in the request context * License and doc updates * Undo doc changes * Added filters test * Remove redundant todo comments * Inject the span context for the request in the tracing filter * Use global logger and rename to panicFilter --- modules/rpc/yarpc.go | 1 - modules/uhttp/filters.go | 107 ++++++++++++++++++++++++++++++++++ modules/uhttp/filters_test.go | 64 ++++++++++++++++++++ modules/uhttp/health.go | 4 +- modules/uhttp/http.go | 60 ++++++++----------- modules/uhttp/http_test.go | 26 ++++++++- modules/uhttp/routes.go | 9 +-- 7 files changed, 225 insertions(+), 46 deletions(-) create mode 100644 modules/uhttp/filters.go create mode 100644 modules/uhttp/filters_test.go diff --git a/modules/rpc/yarpc.go b/modules/rpc/yarpc.go index ddf9b08c..40f5fe19 100644 --- a/modules/rpc/yarpc.go +++ b/modules/rpc/yarpc.go @@ -121,7 +121,6 @@ func (m *YarpcModule) Start(readyCh chan<- struct{}) <-chan error { interceptor := yarpc.Interceptors(m.interceptors...) - // TODO(ai/madhu) pass option for opentracing to NewDispatcher m.rpc, err = _dispatcherFn(m.Host(), yarpc.Config{ Name: m.config.AdvertiseName, Inbounds: []transport.Inbound{ diff --git a/modules/uhttp/filters.go b/modules/uhttp/filters.go new file mode 100644 index 00000000..e4bcd3c8 --- /dev/null +++ b/modules/uhttp/filters.go @@ -0,0 +1,107 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uhttp + +import ( + "fmt" + "net/http" + + "go.uber.org/fx/core/ulog" + + "golang.org/x/net/context" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +// Filter applies filters on requests, request contexts or responses such as +// adding tracing to the context +type Filter interface { + Apply(w http.ResponseWriter, r *http.Request, next http.Handler) +} + +// FilterFunc is an adaptor to call normal functions to apply filters +type FilterFunc func(w http.ResponseWriter, r *http.Request, next http.Handler) + +// Apply implements Apply from the Filter interface and simply delegates to the function +func (f FilterFunc) Apply(w http.ResponseWriter, r *http.Request, next http.Handler) { + f(w, r, next) +} + +type tracerFilter struct { + tracer opentracing.Tracer +} + +func (t tracerFilter) Apply(w http.ResponseWriter, r *http.Request, next http.Handler) { + operationName := r.Method + carrier := opentracing.HTTPHeadersCarrier(r.Header) + spanCtx, err := t.tracer.Extract(opentracing.HTTPHeaders, carrier) + if err != nil && err != opentracing.ErrSpanContextNotFound { + // TODO (madhu): Once context propagation is done, use the context logger instead + ulog.Logger().Info("Malformed inbound tracing context: %s", err.Error()) + } + span := t.tracer.StartSpan(operationName, ext.RPCServerOption(spanCtx)) + ext.HTTPUrl.Set(span, r.URL.String()) + defer span.Finish() + ctx := r.Context() + if ctx == nil { + ctx = context.Background() + } + ctx = opentracing.ContextWithSpan(ctx, span) + r = r.WithContext(ctx) + next.ServeHTTP(w, r) +} + +// panicFilter handles any panics and return an error +func panicFilter(w http.ResponseWriter, r *http.Request, next http.Handler) { + defer func() { + if err := recover(); err != nil { + // TODO(ai) log and add stats to this + w.Header().Add(ContentType, ContentTypeText) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Server error: %+v", err) + } + }() + next.ServeHTTP(w, r) +} + +func newExecutionChain(filters []Filter, finalHandler http.Handler) executionChain { + return executionChain{ + filters: filters, + finalHandler: finalHandler, + } +} + +type executionChain struct { + currentFilter int + filters []Filter + finalHandler http.Handler +} + +func (ec executionChain) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if ec.currentFilter < len(ec.filters) { + filter := ec.filters[ec.currentFilter] + ec.currentFilter++ + filter.Apply(w, req, ec) + } else { + ec.finalHandler.ServeHTTP(w, req) + } +} diff --git a/modules/uhttp/filters_test.go b/modules/uhttp/filters_test.go new file mode 100644 index 00000000..740de9cf --- /dev/null +++ b/modules/uhttp/filters_test.go @@ -0,0 +1,64 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package uhttp + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/opentracing/opentracing-go" + "github.com/stretchr/testify/assert" +) + +var ( + noopTracer = opentracing.NoopTracer{} +) + +func TestExecutionChain(t *testing.T) { + chain := newExecutionChain([]Filter{}, getNoopHandler()) + response := testServeHTTP(chain) + assert.True(t, strings.Contains(response.Body.String(), "filters ok")) +} + +func TestExecutionChainFilters(t *testing.T) { + chain := newExecutionChain( + []Filter{tracerFilter{tracer: noopTracer}, FilterFunc(panicFilter)}, + getNoopHandler(), + ) + response := testServeHTTP(chain) + assert.Contains(t, response.Body.String(), "filters ok") +} + +func testServeHTTP(chain executionChain) *httptest.ResponseRecorder { + request := httptest.NewRequest("", "http://filters", nil) + response := httptest.NewRecorder() + chain.ServeHTTP(response, request) + return response +} + +func getNoopHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "filters ok") + }) +} diff --git a/modules/uhttp/health.go b/modules/uhttp/health.go index 1191f7af..7b2bbafa 100644 --- a/modules/uhttp/health.go +++ b/modules/uhttp/health.go @@ -25,7 +25,9 @@ import ( "net/http" ) -func handleHealth(w http.ResponseWriter, r *http.Request) { +type healthHandler struct{} + +func (h healthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO(ai) import more sophisticated health mechanism from internal libraries fmt.Fprintf(w, "OK\n") } diff --git a/modules/uhttp/http.go b/modules/uhttp/http.go index 21da2a98..8af1d106 100644 --- a/modules/uhttp/http.go +++ b/modules/uhttp/http.go @@ -82,6 +82,7 @@ type Module struct { listener net.Listener handlers []RouteHandler listenMu sync.RWMutex + filters []Filter } var _ service.Module = &Module{} @@ -94,11 +95,11 @@ type Config struct { Debug *bool `yaml:"debug"` } -// CreateHTTPRegistrantsFunc returns a slice of registrants from a service host -type CreateHTTPRegistrantsFunc func(service service.Host) []RouteHandler +// GetHandlersFunc returns a slice of registrants from a service host +type GetHandlersFunc func(service service.Host) []RouteHandler // New returns a new HTTP module -func New(hookup CreateHTTPRegistrantsFunc, options ...modules.Option) service.ModuleCreateFunc { +func New(hookup GetHandlersFunc, options ...modules.Option) service.ModuleCreateFunc { return func(mi service.ModuleCreateInfo) ([]service.Module, error) { mod, err := newModule(mi, hookup, options...) if err != nil { @@ -110,7 +111,7 @@ func New(hookup CreateHTTPRegistrantsFunc, options ...modules.Option) service.Mo func newModule( mi service.ModuleCreateInfo, - createService CreateHTTPRegistrantsFunc, + getHandlers GetHandlersFunc, options ...modules.Option, ) (*Module, error) { // setup config defaults @@ -123,9 +124,13 @@ func newModule( mi.Name = "http" } + handlers := addHealth(getHandlers(mi.Host)) + // TODO (madhu): Add other middleware - logging, metrics. + // TODO (madhu): Once context propagation is done, change tracerFilter to take in the context module := &Module{ ModuleBase: *modules.NewModuleBase(ModuleType, mi.Name, mi.Host, []string{}), - handlers: createService(mi.Host), + handlers: handlers, + filters: []Filter{tracerFilter{tracer: mi.Host.Tracer()}, FilterFunc(panicFilter)}, } err := module.Host().Config().GetValue(getConfigKey(mi.Name)).PopulateStruct(cfg) @@ -159,26 +164,13 @@ func (m *Module) Start(ready chan<- struct{}) <-chan error { m.mux.Handle("/", m.router) - healthFound := false for _, h := range m.handlers { - if h.Path == healthPath { - healthFound = true - } - handle := h.Handler - handle = panicWrap(handle) - // TODO other middlewares, logging, tracing? - route := m.router.Handle(h.Path, handle) - // apply all route options - for _, opt := range h.Options { - opt(Route{route}) - } - } - - if !healthFound { - m.router.HandleFunc(healthPath, handleHealth) + handle := newExecutionChain(m.filters, h.Handler) + m.router.Handle(h.Path, handle) } // Debug is opt-out + // TODO(madhu): Apply filters to the debug handler too? if m.config.Debug == nil || *m.config.Debug { m.router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) } @@ -245,20 +237,16 @@ func getConfigKey(name string) string { return fmt.Sprintf("modules.%s", name) } -// Middlewares - -// handle any panics and return an error -func panicWrap(h http.Handler) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - defer func() { - if err := recover(); err != nil { - // TODO(ai) log and add stats to this - w.Header().Add(ContentType, ContentTypeText) - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Server error: %+v", err) - } - }() - - h.ServeHTTP(w, r) +// addHealth adds in the default if health handler is not set +func addHealth(handlers []RouteHandler) []RouteHandler { + healthFound := false + for _, h := range handlers { + if h.Path == healthPath { + healthFound = true + } + } + if !healthFound { + handlers = append(handlers, NewRouteHandler(healthPath, healthHandler{})) } + return handlers } diff --git a/modules/uhttp/http_test.go b/modules/uhttp/http_test.go index 3d1990dc..ae7cbe44 100644 --- a/modules/uhttp/http_test.go +++ b/modules/uhttp/http_test.go @@ -35,6 +35,7 @@ import ( "go.uber.org/fx/service" . "go.uber.org/fx/service/testutils" + "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,6 +65,15 @@ func TestHTTPModule_Panic_OK(t *testing.T) { }) } +func TestHTTPModule_Tracer(t *testing.T) { + withModule(t, registerTracerCheckHandler, nil, false, func(m *Module) { + assert.NotNil(t, m) + makeRequest(m, "GET", "/", nil, func(r *http.Response) { + assert.Equal(t, http.StatusOK, r.StatusCode, "Expected 200 with tracer check") + }) + }) +} + func TestHTTPModule_StartsAndStops(t *testing.T) { withModule(t, registerPanic, nil, false, func(m *Module) { assert.True(t, m.IsRunning(), "Start should be successful") @@ -130,7 +140,7 @@ func configOption() service.Option { func withModule( t testing.TB, - hookup CreateHTTPRegistrantsFunc, + hookup GetHandlersFunc, options []modules.Option, expectError bool, fn func(*Module), @@ -214,6 +224,20 @@ func makeSingleHandler(path string, fn func(http.ResponseWriter, *http.Request)) } } +func registerTracerCheckHandler(host service.Host) []RouteHandler { + return makeSingleHandler("/", func(_ http.ResponseWriter, r *http.Request) { + span := opentracing.SpanFromContext(r.Context()) + if span == nil { + panic(fmt.Sprintf("Intentional panic, invalid span: %v", span)) + } else if span.Tracer() != host.Tracer() { + panic(fmt.Sprintf( + "Intentional panic, expected tracer: %v different from actual tracer: %v", span.Tracer(), + host.Tracer(), + )) + } + }) +} + func registerCustomHealth(_ service.Host) []RouteHandler { return makeSingleHandler("/health", func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, "not ok") diff --git a/modules/uhttp/routes.go b/modules/uhttp/routes.go index 1f7d7c56..a2c95638 100644 --- a/modules/uhttp/routes.go +++ b/modules/uhttp/routes.go @@ -26,9 +26,6 @@ import ( "github.com/gorilla/mux" ) -// A RouteOption gives you the ability to mangle routes -type RouteOption func(r Route) Route - // FromGorilla turns a gorilla mux route into an UberFx route func FromGorilla(r *mux.Route) Route { return Route{ @@ -40,15 +37,13 @@ func FromGorilla(r *mux.Route) Route { type RouteHandler struct { Path string Handler http.Handler - Options []RouteOption } -// NewRouteHandler creates a route handler given the options -func NewRouteHandler(path string, handler http.Handler, options ...RouteOption) RouteHandler { +// NewRouteHandler creates a route handler +func NewRouteHandler(path string, handler http.Handler) RouteHandler { return RouteHandler{ Path: path, Handler: handler, - Options: options, } }