Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 554 Added compiled glob matching using LRU Cache #615

Merged
merged 19 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
ProfilePath string
Insecure bool
GlobMatchingDisabled bool
GlobCacheSize int
}

type CertSource struct {
Expand Down
2 changes: 2 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ var defaultConfig = &Config{
SpanHost: "localhost:9998",
TraceID128Bit: true,
},

GlobCacheSize: 1000,
}
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.StringVar(&cfg.Tracing.SpanHost, "tracing.SpanHost", defaultConfig.Tracing.SpanHost, "Host:Port info to add to spans")
f.BoolVar(&cfg.Tracing.TraceID128Bit, "tracing.TraceID128Bit", defaultConfig.Tracing.TraceID128Bit, "Generate 128 bit trace IDs")
f.BoolVar(&cfg.GlobMatchingDisabled, "glob.matching.disabled", defaultConfig.GlobMatchingDisabled, "Disable Glob Matching on routes, one of [true, false]")
f.IntVar(&cfg.GlobCacheSize, "glob.cache.size", defaultConfig.GlobCacheSize, "sets the size of the glob cache")

f.StringVar(&cfg.Registry.Custom.Host, "registry.custom.host", defaultConfig.Registry.Custom.Host, "custom back end hostname/port")
f.StringVar(&cfg.Registry.Custom.Scheme, "registry.custom.scheme", defaultConfig.Registry.Custom.Scheme, "custom back end scheme - http/https")
Expand Down
7 changes: 7 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,13 @@ func TestLoad(t *testing.T) {
cfg: func(cfg *Config) *Config { return nil },
err: errors.New("missing 'file' in auth 'foo'"),
},
{
args: []string{"-glob.cache.size", "1000"},
cfg: func(cfg *Config) *Config {
cfg.GlobCacheSize = 1000
return cfg
},
},
{
args: []string{"-cfg"},
cfg: func(cfg *Config) *Config { return nil },
Expand Down
9 changes: 9 additions & 0 deletions docs/content/ref/glob.cache.size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: "glob.cache.size"
---

`glob.cache.size` Sets the globCache size used for matching on route lookups.

The default is

glob.cache.size = 1000
12 changes: 12 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@
#
# proxy.gzip.contenttype =


# proxy.auth configures one or more auth schemes.
#
# Each auth scheme is configured with a list of
Expand Down Expand Up @@ -524,6 +525,7 @@
# proxy.auth = name=mybasicauth;type=basic;file=p/creds.htpasswd
# name=myotherauth;type=basic;file=p/other-creds.htpasswd;realm=myrealm


# log.access.format configures the format of the access log.
#
# If the value is either 'common' or 'combined' then the logs are written in
Expand Down Expand Up @@ -722,6 +724,7 @@
#
# registry.consul.tls.cafile =


# registry.consul.tls.capath the path to the folder containing CA certificates.
#
# This is the full path to the folder with CA certificates while using TLS transport to
Expand Down Expand Up @@ -890,6 +893,7 @@
#
# registry.consul.serviceMonitors = 1


# registry.consul.pollInterval configures the poll interval
# for route updates. If Poll interval is set to 0 the updates will
# be disabled and fall back to blocking queries. Other values can
Expand Down Expand Up @@ -947,6 +951,7 @@
#
# registry.custom.path =


# registry.custom.queryparams is the query parameters used in the custom back
# end API Call
#
Expand All @@ -971,6 +976,12 @@
#
# glob.matching.disabled = false

# glob.cache.size sets the globCache size used for matching on route lookups.
#
# The default is
#
# glob.cache.size = 1000


# metrics.target configures the backend the metrics values are
# sent to.
Expand Down Expand Up @@ -1246,6 +1257,7 @@
#
# tracing.TracingEnabled = false


# tracing.CollectorType sets what type of collector is used.
# Currently only two types are supported http and kafka
#
Expand Down
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func main() {
}

func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {

//Init Glob Cache
globCache := route.NewGlobCache(cfg.GlobCacheSize)

statsHandler := &proxy.GrpcStatsHandler{
Connect: metrics.DefaultRegistry.GetCounter("grpc.conn"),
Request: metrics.DefaultRegistry.GetTimer("grpc.requests"),
Expand All @@ -157,6 +161,7 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {
proxyInterceptor := proxy.GrpcProxyInterceptor{
Config: cfg,
StatsHandler: statsHandler,
GlobCache: globCache,
}

handler := grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(tlscfg))
Expand All @@ -171,6 +176,10 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {

func newHTTPProxy(cfg *config.Config) http.Handler {
var w io.Writer

//Init Glob Cache
globCache := route.NewGlobCache(cfg.GlobCacheSize)

switch cfg.Log.AccessTarget {
case "":
log.Printf("[INFO] Access logging disabled")
Expand Down Expand Up @@ -223,7 +232,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler {
Transport: newTransport(nil),
InsecureTransport: newTransport(&tls.Config{InsecureSkipVerify: true}),
Lookup: func(r *http.Request) *route.Target {
t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match, cfg.GlobMatchingDisabled)
t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match, globCache, cfg.GlobMatchingDisabled)
if t == nil {
notFound.Inc(1)
log.Print("[WARN] No route for ", r.Host, r.URL)
Expand Down
3 changes: 2 additions & 1 deletion proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodNam
type GrpcProxyInterceptor struct {
Config *config.Config
StatsHandler *GrpcStatsHandler
GlobCache *route.GlobCache
}

type targetKey struct{}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string)
Header: headers,
}

return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.Config.GlobMatchingDisabled), nil
return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.GlobCache, g.Config.GlobMatchingDisabled), nil
}

type GrpcStatsHandler struct {
Expand Down
17 changes: 10 additions & 7 deletions proxy/http_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
globDisabled = true
)

//Global GlobCache for Testing
var globCache = route.NewGlobCache(1000)

func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) {
var hdr http.Header = make(http.Header)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -194,7 +197,7 @@ func TestProxyStripsPath(t *testing.T) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add mock /foo/bar " + server.URL + ` opts "strip=/foo"`))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -230,7 +233,7 @@ func TestProxyHost(t *testing.T) {
},
},
Lookup: func(r *http.Request) *route.Target {
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -277,7 +280,7 @@ func TestHostRedirect(t *testing.T) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
r.Host = "c.com"
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -316,7 +319,7 @@ func TestPathRedirect(t *testing.T) {
proxy := httptest.NewServer(&HTTPProxy{
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -484,7 +487,7 @@ func TestProxyHTTPSUpstream(t *testing.T) {
Transport: &http.Transport{TLSClientConfig: tlsClientConfig()},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add srv / " + server.URL + ` opts "proto=https"`))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand All @@ -511,7 +514,7 @@ func TestProxyHTTPSUpstreamSkipVerify(t *testing.T) {
},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add srv / " + server.URL + ` opts "proto=https tlsskipverify=true"`))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -712,7 +715,7 @@ func BenchmarkProxyLogger(b *testing.B) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add mock / " + server.URL))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
Logger: l,
}
Expand Down
4 changes: 1 addition & 3 deletions proxy/listen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ func TestGracefulShutdown(t *testing.T) {
}))
defer srv.Close()

globDisabled := false

// start proxy
addr := "127.0.0.1:57777"
var wg sync.WaitGroup
Expand All @@ -35,7 +33,7 @@ func TestGracefulShutdown(t *testing.T) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add svc / " + srv.URL))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globDisabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
}
l := config.Listen{Addr: addr}
Expand Down
6 changes: 2 additions & 4 deletions proxy/ws_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ func TestProxyWSUpstream(t *testing.T) {
defer wssServer.Close()
t.Log("Started WSS server: ", wssServer.URL)

globDisabled := false

routes := "route add ws /ws " + wsServer.URL + "\n"
routes += "route add ws /wss " + wssServer.URL + ` opts "proto=https"` + "\n"
routes += "route add ws /insecure " + wssServer.URL + ` opts "proto=https tlsskipverify=true"` + "\n"
Expand All @@ -46,7 +44,7 @@ func TestProxyWSUpstream(t *testing.T) {
InsecureTransport: &http.Transport{TLSClientConfig: tlsInsecureConfig()},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString(routes))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globDisabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer httpProxy.Close()
Expand All @@ -58,7 +56,7 @@ func TestProxyWSUpstream(t *testing.T) {
InsecureTransport: &http.Transport{TLSClientConfig: tlsInsecureConfig()},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString(routes))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globDisabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
httpsProxy.TLS = tlsServerConfig()
Expand Down
2 changes: 0 additions & 2 deletions registry/consul/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ func (w *ServiceMonitor) Watch(updates chan string) {
} else {
q = &api.QueryOptions{RequireConsistent: true, WaitIndex: lastIndex}
}

checks, meta, err := w.client.Health().State("any", q)
if err != nil {
log.Printf("[WARN] consul: Error fetching health state. %v", err)
time.Sleep(time.Second)
continue
}

log.Printf("[DEBUG] consul: Health changed to #%d", meta.LastIndex)

// determine which services have passing health checks
Expand Down
65 changes: 65 additions & 0 deletions route/glob_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package route

import (
"github.com/gobwas/glob"
"sync"
)

// GlobCache implements an LRU cache for compiled glob patterns.
type GlobCache struct {
// m maps patterns to compiled glob matchers.
m sync.Map

// l contains the added patterns and serves as an LRU cache.
// l has a fixed size and is initialized in the constructor.
l []string

// h is the first element in l.
h int

// n is the number of elements in l.
n int
}

func NewGlobCache(size int) *GlobCache {
return &GlobCache{
l: make([]string, size),
}
}

// Get returns the compiled glob pattern if it compiled without
// error. Otherwise, the function returns nil. If the pattern
// is not in the cache it will be added.
func (c *GlobCache) Get(pattern string) (glob.Glob, error) {
// fast path
if glb, ok := c.m.Load(pattern); ok {
//Type Assert the returned interface{}
return glb.(glob.Glob), nil
}

// try to compile pattern
glbCompiled, err := glob.Compile(pattern)
if err != nil {
return nil, err
}

// if the LRU buffer is not full just append
// the element to the buffer.
if c.n < len(c.l) {
c.m.Store(pattern, glbCompiled)
c.l[c.n] = pattern
c.n++
return glbCompiled, nil
}

// otherwise, remove the oldest element and move
// the head. Note that once the buffer is full
// (c.n == len(c.l)) it will never become smaller
// again.
// TODO add logging for cache full - How will this impact performance
c.m.Delete(c.l[c.h])
c.m.Store(pattern, glbCompiled)
c.l[c.h] = pattern
c.h = (c.h + 1) % c.n
return glbCompiled, nil
}
Loading