From b8151c3ed99dcab7dcd7bb4a1e0d78c240be8aca Mon Sep 17 00:00:00 2001 From: Nathan Johnson Date: Mon, 22 Feb 2021 18:09:09 -0600 Subject: [PATCH] fixes #807 - changes map for grpc connections to be a string key --- proxy/grpc_handler.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/proxy/grpc_handler.go b/proxy/grpc_handler.go index 863d834d8..d240e91e5 100644 --- a/proxy/grpc_handler.go +++ b/proxy/grpc_handler.go @@ -54,8 +54,7 @@ func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodNam return ctx, nil, fmt.Errorf("error extracting metadata from request") } - outCtx, _ := context.WithCancel(ctx) - outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) + outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) target, _ := ctx.Value(targetKey{}).(*route.Target) @@ -88,6 +87,10 @@ func (p proxyStream) Context() context.Context { return p.ctx } +func makeGRPCTargetKey(t *route.Target) string { + return t.URL.String() +} + func (g GrpcProxyInterceptor) Stream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { ctx := stream.Context() @@ -199,7 +202,7 @@ func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats) } type grpcConnectionPool struct { - connections map[*route.Target]*grpc.ClientConn + connections map[string]*grpc.ClientConn lock sync.RWMutex cleanupInterval time.Duration tlscfg *tls.Config @@ -207,7 +210,7 @@ type grpcConnectionPool struct { func newGrpcConnectionPool(tlscfg *tls.Config) *grpcConnectionPool { cp := &grpcConnectionPool{ - connections: make(map[*route.Target]*grpc.ClientConn), + connections: make(map[string]*grpc.ClientConn), lock: sync.RWMutex{}, cleanupInterval: time.Second * 5, tlscfg: tlscfg, @@ -220,7 +223,7 @@ func newGrpcConnectionPool(tlscfg *tls.Config) *grpcConnectionPool { func (p *grpcConnectionPool) Get(ctx context.Context, target *route.Target) (*grpc.ClientConn, error) { p.lock.RLock() - conn := p.connections[target] + conn := p.connections[makeGRPCTargetKey(target)] p.lock.RUnlock() if conn != nil && conn.GetState() != connectivity.Shutdown { @@ -261,23 +264,24 @@ func (p *grpcConnectionPool) newConnection(ctx context.Context, target *route.Ta func (p *grpcConnectionPool) Set(target *route.Target, conn *grpc.ClientConn) { p.lock.Lock() defer p.lock.Unlock() - p.connections[target] = conn + + p.connections[makeGRPCTargetKey(target)] = conn } func (p *grpcConnectionPool) cleanup() { for { p.lock.Lock() table := route.GetTable() - for target, cs := range p.connections { + for tKey, cs := range p.connections { if cs.GetState() == connectivity.Shutdown { - delete(p.connections, target) + delete(p.connections, tKey) continue } - if !hasTarget(target, table) { - log.Println("[DEBUG] grpc: cleaning up connection to", target.URL.Host) + if !hasTarget(tKey, table) { + log.Println("[DEBUG] grpc: cleaning up connection to", tKey) cs.Close() - delete(p.connections, target) + delete(p.connections, tKey) } } p.lock.Unlock() @@ -285,11 +289,11 @@ func (p *grpcConnectionPool) cleanup() { } } -func hasTarget(target *route.Target, table route.Table) bool { +func hasTarget(tKey string, table route.Table) bool { for _, routes := range table { for _, r := range routes { for _, t := range r.Targets { - if target == t { + if tKey == makeGRPCTargetKey(t) { return true } }