Skip to content

Commit

Permalink
grpcproxy, etcdmain, integration: add close channel to kv proxy
Browse files Browse the repository at this point in the history
ccache launches goroutines that need to be explicitly stopped.

Fixes #7158
  • Loading branch information
Anthony Romano committed Jan 18, 2017
1 parent 1a962df commit 8c0282a
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
os.Exit(1)
}

kvp := grpcproxy.NewKvProxy(client)
kvp, _ := grpcproxy.NewKvProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(client)
clusterp := grpcproxy.NewClusterProxy(client)
leasep := grpcproxy.NewLeaseProxy(client)
Expand Down
26 changes: 16 additions & 10 deletions integration/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ var (
)

type grpcClientProxy struct {
grpc grpcAPI
wdonec <-chan struct{}
grpc grpcAPI
wdonec <-chan struct{}
kvdonec <-chan struct{}
}

func toGRPC(c *clientv3.Client) grpcAPI {
Expand All @@ -43,26 +44,30 @@ func toGRPC(c *clientv3.Client) grpcAPI {
}

wp, wpch := grpcproxy.NewWatchProxy(c)
kvp, kvpch := grpcproxy.NewKvProxy(c)
grpc := grpcAPI{
pb.NewClusterClient(c.ActiveConnection()),
grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
grpcproxy.KvServerToKvClient(kvp),
pb.NewLeaseClient(c.ActiveConnection()),
grpcproxy.WatchServerToWatchClient(wp),
pb.NewMaintenanceClient(c.ActiveConnection()),
pb.NewAuthClient(c.ActiveConnection()),
}
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch}
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch}
return grpc
}

type watchCloser struct {
type proxyCloser struct {
clientv3.Watcher
wdonec <-chan struct{}
wdonec <-chan struct{}
kvdonec <-chan struct{}
}

func (wc *watchCloser) Close() error {
err := wc.Watcher.Close()
<-wc.wdonec
func (pc *proxyCloser) Close() error {
// client ctx is canceled before calling close, so kv will close out
<-pc.kvdonec
err := pc.Watcher.Close()
<-pc.wdonec
return err
}

Expand All @@ -74,9 +79,10 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
rpc := toGRPC(c)
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
pmu.Lock()
c.Watcher = &watchCloser{
c.Watcher = &proxyCloser{
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
wdonec: proxies[c].wdonec,
kvdonec: proxies[c].kvdonec,
}
pmu.Unlock()
return c, nil
Expand Down
3 changes: 3 additions & 0 deletions proxy/grpcproxy/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Cache interface {
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
Compact(revision int64)
Invalidate(key []byte, endkey []byte)
Close()
}

// keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
Expand All @@ -58,6 +59,8 @@ func NewCache(maxCacheEntries int) Cache {
}
}

func (c *cache) Close() { c.lru.Stop() }

// cache implements Cache
type cache struct {
mu sync.RWMutex
Expand Down
11 changes: 9 additions & 2 deletions proxy/grpcproxy/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@ type kvProxy struct {
cache cache.Cache
}

func NewKvProxy(c *clientv3.Client) pb.KVServer {
return &kvProxy{
func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
kv := &kvProxy{
kv: c.KV,
cache: cache.NewCache(cache.DefaultMaxEntries),
}
donec := make(chan struct{})
go func() {
defer close(donec)
<-c.Ctx().Done()
kv.cache.Close()
}()
return kv, donec
}

func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion proxy/grpcproxy/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
t.Fatal(err)
}

kvp := NewKvProxy(client)
kvp, _ := NewKvProxy(client)

kvts := &kvproxyTestServer{
kp: kvp,
Expand Down

0 comments on commit 8c0282a

Please sign in to comment.