Skip to content

Commit

Permalink
fixes #807 - changes map for grpc connections to be a string key
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanejohnson committed Feb 23, 2021
1 parent b8d800f commit b8151c3
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodNam
return ctx, nil, fmt.Errorf("error extracting metadata from request")
}

outCtx, _ := context.WithCancel(ctx)
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())

target, _ := ctx.Value(targetKey{}).(*route.Target)

Expand Down Expand Up @@ -88,6 +87,10 @@ func (p proxyStream) Context() context.Context {
return p.ctx
}

func makeGRPCTargetKey(t *route.Target) string {
return t.URL.String()
}

func (g GrpcProxyInterceptor) Stream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()

Expand Down Expand Up @@ -199,15 +202,15 @@ func (h *GrpcStatsHandler) HandleConn(ctx context.Context, conn stats.ConnStats)
}

type grpcConnectionPool struct {
connections map[*route.Target]*grpc.ClientConn
connections map[string]*grpc.ClientConn
lock sync.RWMutex
cleanupInterval time.Duration
tlscfg *tls.Config
}

func newGrpcConnectionPool(tlscfg *tls.Config) *grpcConnectionPool {
cp := &grpcConnectionPool{
connections: make(map[*route.Target]*grpc.ClientConn),
connections: make(map[string]*grpc.ClientConn),
lock: sync.RWMutex{},
cleanupInterval: time.Second * 5,
tlscfg: tlscfg,
Expand All @@ -220,7 +223,7 @@ func newGrpcConnectionPool(tlscfg *tls.Config) *grpcConnectionPool {

func (p *grpcConnectionPool) Get(ctx context.Context, target *route.Target) (*grpc.ClientConn, error) {
p.lock.RLock()
conn := p.connections[target]
conn := p.connections[makeGRPCTargetKey(target)]
p.lock.RUnlock()

if conn != nil && conn.GetState() != connectivity.Shutdown {
Expand Down Expand Up @@ -261,35 +264,36 @@ func (p *grpcConnectionPool) newConnection(ctx context.Context, target *route.Ta
func (p *grpcConnectionPool) Set(target *route.Target, conn *grpc.ClientConn) {
p.lock.Lock()
defer p.lock.Unlock()
p.connections[target] = conn

p.connections[makeGRPCTargetKey(target)] = conn
}

func (p *grpcConnectionPool) cleanup() {
for {
p.lock.Lock()
table := route.GetTable()
for target, cs := range p.connections {
for tKey, cs := range p.connections {
if cs.GetState() == connectivity.Shutdown {
delete(p.connections, target)
delete(p.connections, tKey)
continue
}

if !hasTarget(target, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", target.URL.Host)
if !hasTarget(tKey, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
cs.Close()
delete(p.connections, target)
delete(p.connections, tKey)
}
}
p.lock.Unlock()
time.Sleep(p.cleanupInterval)
}
}

func hasTarget(target *route.Target, table route.Table) bool {
func hasTarget(tKey string, table route.Table) bool {
for _, routes := range table {
for _, r := range routes {
for _, t := range r.Targets {
if target == t {
if tKey == makeGRPCTargetKey(t) {
return true
}
}
Expand Down

0 comments on commit b8151c3

Please sign in to comment.