Skip to content

Commit

Permalink
add connection pooling for upstream grpc connections
Browse files Browse the repository at this point in the history
  • Loading branch information
andyroyle committed Nov 28, 2018
1 parent 8705058 commit 4caac25
Showing 1 changed file with 111 additions and 22 deletions.
133 changes: 111 additions & 22 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/fabiolb/fabio/config"
"github.com/fabiolb/fabio/metrics"
"github.com/fabiolb/fabio/route"
grpc_proxy "github.com/mwitkow/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
Expand All @@ -41,42 +43,29 @@ func (s *gRPCServer) Serve(lis net.Listener) error {
}

func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {

connectionPool := newGrpcConnectionPool(tlscfg)

return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)

if !ok {
return ctx, nil, fmt.Errorf("error extracting metadata from request")
}

outCtx, _ := context.WithCancel(ctx)
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())

target, _ := ctx.Value(targetKey{}).(*route.Target)

if target == nil {
log.Println("[WARN] grpc: no route for ", fullMethodName)
return ctx, nil, fmt.Errorf("no route found")
}

opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(grpc_proxy.Codec())),
return outCtx, nil, fmt.Errorf("no route found")
}

if target.URL.Scheme == "grpcs" && tlscfg != nil {
opts = append(opts, grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{
ClientCAs: tlscfg.ClientCAs,
InsecureSkipVerify: target.TLSSkipVerify,
// as per the http/2 spec, the host header isn't required, so if your
// target service doesn't have IP SANs in it's certificate
// then you will need to override the servername
ServerName: target.Opts["grpcservername"],
})))
} else {
opts = append(opts, grpc.WithInsecure())
}

newCtx := metadata.NewOutgoingContext(ctx, md)
conn, err := grpc.DialContext(newCtx, target.URL.Host, opts...)
conn, err := connectionPool.Get(outCtx, target)

return newCtx, conn, err
return outCtx, conn, err
}

}
Expand Down Expand Up @@ -203,3 +192,103 @@ func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats)
h.Connect.Inc(1)
}
}

type grpcConnectionPool struct {
connections map[*route.Target]*grpc.ClientConn
lock sync.RWMutex
cleanupInterval time.Duration
tlscfg *tls.Config
}

func newGrpcConnectionPool(tlscfg *tls.Config) *grpcConnectionPool {
cp := &grpcConnectionPool{
connections: make(map[*route.Target]*grpc.ClientConn),
lock: sync.RWMutex{},
cleanupInterval: time.Second * 5,
tlscfg: tlscfg,
}

go cp.cleanup()

return cp
}

func (p *grpcConnectionPool) Get(ctx context.Context, target *route.Target) (*grpc.ClientConn, error) {
p.lock.RLock()
conn := p.connections[target]
p.lock.RUnlock()

if conn != nil && conn.GetState() != connectivity.Shutdown {
return conn, nil
}

return p.newConnection(ctx, target)
}

func (p *grpcConnectionPool) newConnection(ctx context.Context, target *route.Target) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(grpc_proxy.Codec())),
}

if target.URL.Scheme == "grpcs" && p.tlscfg != nil {
opts = append(opts, grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{
ClientCAs: p.tlscfg.ClientCAs,
InsecureSkipVerify: target.TLSSkipVerify,
// as per the http/2 spec, the host header isn't required, so if your
// target service doesn't have IP SANs in it's certificate
// then you will need to override the servername
ServerName: target.Opts["grpcservername"],
})))
} else {
opts = append(opts, grpc.WithInsecure())
}

conn, err := grpc.DialContext(ctx, target.URL.Host, opts...)

if err == nil {
p.Set(target, conn)
}

return conn, err
}

func (p *grpcConnectionPool) Set(target *route.Target, conn *grpc.ClientConn) {
p.lock.Lock()
defer p.lock.Unlock()
p.connections[target] = conn
}

func (p *grpcConnectionPool) cleanup() {
for {
p.lock.Lock()
table := route.GetTable()
for target, cs := range p.connections {
if cs.GetState() == connectivity.Shutdown {
delete(p.connections, target)
continue
}

if !hasTarget(target, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", target.URL.Host)
cs.Close()
delete(p.connections, target)
}
}
p.lock.Unlock()
time.Sleep(p.cleanupInterval)
}
}

func hasTarget(target *route.Target, table route.Table) bool {
for _, routes := range table {
for _, r := range routes {
for _, t := range r.Targets {
if target == t {
return true
}
}
}
}
return false
}

0 comments on commit 4caac25

Please sign in to comment.