Skip to content

Commit

Permalink
Added filter chain for HTTP module and filters for tracer and error h…
Browse files Browse the repository at this point in the history
…andling (uber-go#71)

* Added filter chain for HTTP module and filters for tracer and error

* Added http test to check that tracer has been set in the request context

* License and doc updates

* Undo doc changes

* Added filters test

* Remove redundant todo comments

* Inject the span context for the request in the tracing filter

* Use global logger and rename to panicFilter
  • Loading branch information
madhuravi committed Nov 18, 2016
1 parent debfcc9 commit d55c9e4
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 46 deletions.
1 change: 0 additions & 1 deletion modules/rpc/yarpc.go
Expand Up @@ -121,7 +121,6 @@ func (m *YarpcModule) Start(readyCh chan<- struct{}) <-chan error {

interceptor := yarpc.Interceptors(m.interceptors...)

// TODO(ai/madhu) pass option for opentracing to NewDispatcher
m.rpc, err = _dispatcherFn(m.Host(), yarpc.Config{
Name: m.config.AdvertiseName,
Inbounds: []transport.Inbound{
Expand Down
107 changes: 107 additions & 0 deletions modules/uhttp/filters.go
@@ -0,0 +1,107 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package uhttp

import (
"fmt"
"net/http"

"go.uber.org/fx/core/ulog"

"golang.org/x/net/context"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// Filter applies filters on requests, request contexts or responses such as
// adding tracing to the context
type Filter interface {
Apply(w http.ResponseWriter, r *http.Request, next http.Handler)
}

// FilterFunc is an adaptor to call normal functions to apply filters
type FilterFunc func(w http.ResponseWriter, r *http.Request, next http.Handler)

// Apply implements Apply from the Filter interface and simply delegates to the function
func (f FilterFunc) Apply(w http.ResponseWriter, r *http.Request, next http.Handler) {
f(w, r, next)
}

type tracerFilter struct {
tracer opentracing.Tracer
}

func (t tracerFilter) Apply(w http.ResponseWriter, r *http.Request, next http.Handler) {
operationName := r.Method
carrier := opentracing.HTTPHeadersCarrier(r.Header)
spanCtx, err := t.tracer.Extract(opentracing.HTTPHeaders, carrier)
if err != nil && err != opentracing.ErrSpanContextNotFound {
// TODO (madhu): Once context propagation is done, use the context logger instead
ulog.Logger().Info("Malformed inbound tracing context: %s", err.Error())
}
span := t.tracer.StartSpan(operationName, ext.RPCServerOption(spanCtx))
ext.HTTPUrl.Set(span, r.URL.String())
defer span.Finish()
ctx := r.Context()
if ctx == nil {
ctx = context.Background()
}
ctx = opentracing.ContextWithSpan(ctx, span)
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
}

// panicFilter handles any panics and return an error
func panicFilter(w http.ResponseWriter, r *http.Request, next http.Handler) {
defer func() {
if err := recover(); err != nil {
// TODO(ai) log and add stats to this
w.Header().Add(ContentType, ContentTypeText)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Server error: %+v", err)
}
}()
next.ServeHTTP(w, r)
}

func newExecutionChain(filters []Filter, finalHandler http.Handler) executionChain {
return executionChain{
filters: filters,
finalHandler: finalHandler,
}
}

type executionChain struct {
currentFilter int
filters []Filter
finalHandler http.Handler
}

func (ec executionChain) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if ec.currentFilter < len(ec.filters) {
filter := ec.filters[ec.currentFilter]
ec.currentFilter++
filter.Apply(w, req, ec)
} else {
ec.finalHandler.ServeHTTP(w, req)
}
}
64 changes: 64 additions & 0 deletions modules/uhttp/filters_test.go
@@ -0,0 +1,64 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package uhttp

import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
)

var (
noopTracer = opentracing.NoopTracer{}
)

func TestExecutionChain(t *testing.T) {
chain := newExecutionChain([]Filter{}, getNoopHandler())
response := testServeHTTP(chain)
assert.True(t, strings.Contains(response.Body.String(), "filters ok"))
}

func TestExecutionChainFilters(t *testing.T) {
chain := newExecutionChain(
[]Filter{tracerFilter{tracer: noopTracer}, FilterFunc(panicFilter)},
getNoopHandler(),
)
response := testServeHTTP(chain)
assert.Contains(t, response.Body.String(), "filters ok")
}

func testServeHTTP(chain executionChain) *httptest.ResponseRecorder {
request := httptest.NewRequest("", "http://filters", nil)
response := httptest.NewRecorder()
chain.ServeHTTP(response, request)
return response
}

func getNoopHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "filters ok")
})
}
4 changes: 3 additions & 1 deletion modules/uhttp/health.go
Expand Up @@ -25,7 +25,9 @@ import (
"net/http"
)

func handleHealth(w http.ResponseWriter, r *http.Request) {
type healthHandler struct{}

func (h healthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO(ai) import more sophisticated health mechanism from internal libraries
fmt.Fprintf(w, "OK\n")
}
60 changes: 24 additions & 36 deletions modules/uhttp/http.go
Expand Up @@ -82,6 +82,7 @@ type Module struct {
listener net.Listener
handlers []RouteHandler
listenMu sync.RWMutex
filters []Filter
}

var _ service.Module = &Module{}
Expand All @@ -94,11 +95,11 @@ type Config struct {
Debug *bool `yaml:"debug"`
}

// CreateHTTPRegistrantsFunc returns a slice of registrants from a service host
type CreateHTTPRegistrantsFunc func(service service.Host) []RouteHandler
// GetHandlersFunc returns a slice of registrants from a service host
type GetHandlersFunc func(service service.Host) []RouteHandler

// New returns a new HTTP module
func New(hookup CreateHTTPRegistrantsFunc, options ...modules.Option) service.ModuleCreateFunc {
func New(hookup GetHandlersFunc, options ...modules.Option) service.ModuleCreateFunc {
return func(mi service.ModuleCreateInfo) ([]service.Module, error) {
mod, err := newModule(mi, hookup, options...)
if err != nil {
Expand All @@ -110,7 +111,7 @@ func New(hookup CreateHTTPRegistrantsFunc, options ...modules.Option) service.Mo

func newModule(
mi service.ModuleCreateInfo,
createService CreateHTTPRegistrantsFunc,
getHandlers GetHandlersFunc,
options ...modules.Option,
) (*Module, error) {
// setup config defaults
Expand All @@ -123,9 +124,13 @@ func newModule(
mi.Name = "http"
}

handlers := addHealth(getHandlers(mi.Host))
// TODO (madhu): Add other middleware - logging, metrics.
// TODO (madhu): Once context propagation is done, change tracerFilter to take in the context
module := &Module{
ModuleBase: *modules.NewModuleBase(ModuleType, mi.Name, mi.Host, []string{}),
handlers: createService(mi.Host),
handlers: handlers,
filters: []Filter{tracerFilter{tracer: mi.Host.Tracer()}, FilterFunc(panicFilter)},
}

err := module.Host().Config().GetValue(getConfigKey(mi.Name)).PopulateStruct(cfg)
Expand Down Expand Up @@ -159,26 +164,13 @@ func (m *Module) Start(ready chan<- struct{}) <-chan error {

m.mux.Handle("/", m.router)

healthFound := false
for _, h := range m.handlers {
if h.Path == healthPath {
healthFound = true
}
handle := h.Handler
handle = panicWrap(handle)
// TODO other middlewares, logging, tracing?
route := m.router.Handle(h.Path, handle)
// apply all route options
for _, opt := range h.Options {
opt(Route{route})
}
}

if !healthFound {
m.router.HandleFunc(healthPath, handleHealth)
handle := newExecutionChain(m.filters, h.Handler)
m.router.Handle(h.Path, handle)
}

// Debug is opt-out
// TODO(madhu): Apply filters to the debug handler too?
if m.config.Debug == nil || *m.config.Debug {
m.router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
}
Expand Down Expand Up @@ -245,20 +237,16 @@ func getConfigKey(name string) string {
return fmt.Sprintf("modules.%s", name)
}

// Middlewares

// handle any panics and return an error
func panicWrap(h http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
// TODO(ai) log and add stats to this
w.Header().Add(ContentType, ContentTypeText)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Server error: %+v", err)
}
}()

h.ServeHTTP(w, r)
// addHealth adds in the default if health handler is not set
func addHealth(handlers []RouteHandler) []RouteHandler {
healthFound := false
for _, h := range handlers {
if h.Path == healthPath {
healthFound = true
}
}
if !healthFound {
handlers = append(handlers, NewRouteHandler(healthPath, healthHandler{}))
}
return handlers
}
26 changes: 25 additions & 1 deletion modules/uhttp/http_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"go.uber.org/fx/service"
. "go.uber.org/fx/service/testutils"

"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -64,6 +65,15 @@ func TestHTTPModule_Panic_OK(t *testing.T) {
})
}

func TestHTTPModule_Tracer(t *testing.T) {
withModule(t, registerTracerCheckHandler, nil, false, func(m *Module) {
assert.NotNil(t, m)
makeRequest(m, "GET", "/", nil, func(r *http.Response) {
assert.Equal(t, http.StatusOK, r.StatusCode, "Expected 200 with tracer check")
})
})
}

func TestHTTPModule_StartsAndStops(t *testing.T) {
withModule(t, registerPanic, nil, false, func(m *Module) {
assert.True(t, m.IsRunning(), "Start should be successful")
Expand Down Expand Up @@ -130,7 +140,7 @@ func configOption() service.Option {

func withModule(
t testing.TB,
hookup CreateHTTPRegistrantsFunc,
hookup GetHandlersFunc,
options []modules.Option,
expectError bool,
fn func(*Module),
Expand Down Expand Up @@ -214,6 +224,20 @@ func makeSingleHandler(path string, fn func(http.ResponseWriter, *http.Request))
}
}

func registerTracerCheckHandler(host service.Host) []RouteHandler {
return makeSingleHandler("/", func(_ http.ResponseWriter, r *http.Request) {
span := opentracing.SpanFromContext(r.Context())
if span == nil {
panic(fmt.Sprintf("Intentional panic, invalid span: %v", span))
} else if span.Tracer() != host.Tracer() {
panic(fmt.Sprintf(
"Intentional panic, expected tracer: %v different from actual tracer: %v", span.Tracer(),
host.Tracer(),
))
}
})
}

func registerCustomHealth(_ service.Host) []RouteHandler {
return makeSingleHandler("/health", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "not ok")
Expand Down
9 changes: 2 additions & 7 deletions modules/uhttp/routes.go
Expand Up @@ -26,9 +26,6 @@ import (
"github.com/gorilla/mux"
)

// A RouteOption gives you the ability to mangle routes
type RouteOption func(r Route) Route

// FromGorilla turns a gorilla mux route into an UberFx route
func FromGorilla(r *mux.Route) Route {
return Route{
Expand All @@ -40,15 +37,13 @@ func FromGorilla(r *mux.Route) Route {
type RouteHandler struct {
Path string
Handler http.Handler
Options []RouteOption
}

// NewRouteHandler creates a route handler given the options
func NewRouteHandler(path string, handler http.Handler, options ...RouteOption) RouteHandler {
// NewRouteHandler creates a route handler
func NewRouteHandler(path string, handler http.Handler) RouteHandler {
return RouteHandler{
Path: path,
Handler: handler,
Options: options,
}
}

Expand Down

0 comments on commit d55c9e4

Please sign in to comment.