From 6d3a053298f62c8ee49a7c25344acd6f3e56d390 Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Fri, 26 Aug 2016 16:59:23 +0200 Subject: [PATCH] registry/rpc: fix build errors from ClientConn.State() As ClientConn.State() of gRPC has disappeared, we need to avoid using ClientConn.State(). Instead we should make use of gRPC rebalancer mechanism, just like etcdv3 is doing. To do that, introduce simpleBalancer, as a minimum structure to be used for grpc.Balancer. --- registry/rpc/balancer.go | 75 ++++++++++++++++++++++++++++++++ registry/rpc/rpcregistry.go | 68 +++++++---------------------- registry/rpc/rpcregistry_test.go | 4 +- 3 files changed, 94 insertions(+), 53 deletions(-) create mode 100644 registry/rpc/balancer.go diff --git a/registry/rpc/balancer.go b/registry/rpc/balancer.go new file mode 100644 index 000000000..a687ea332 --- /dev/null +++ b/registry/rpc/balancer.go @@ -0,0 +1,75 @@ +// Copyright 2016 The fleet Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpc + +import ( + "net/url" + "strings" + "sync" + "sync/atomic" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type simpleBalancer struct { + addrs []string + notifyCh chan []grpc.Address + numGets uint32 + + // readyc closes once the first connection is up + readyc chan struct{} + readyOnce sync.Once +} + +func newSimpleBalancer(eps []string) *simpleBalancer { + notifyCh := make(chan []grpc.Address, 1) + addrs := make([]grpc.Address, len(eps)) + for i, ep := range eps { + addrs[i].Addr = getHost(ep) + } + notifyCh <- addrs + return &simpleBalancer{ + addrs: eps, + notifyCh: notifyCh, + readyc: make(chan struct{}), + } +} + +func (b *simpleBalancer) Start(target string) error { return nil } +func (b *simpleBalancer) Up(addr grpc.Address) func(error) { + b.readyOnce.Do(func() { close(b.readyc) }) + return func(error) {} +} + +func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { + v := atomic.AddUint32(&b.numGets, 1) + addr := b.addrs[v%uint32(len(b.addrs))] + return grpc.Address{Addr: addr}, func() {}, nil +} + +func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } +func (b *simpleBalancer) Close() error { + close(b.notifyCh) + return nil +} + +func getHost(ep string) string { + url, uerr := url.Parse(ep) + if uerr != nil || !strings.Contains(ep, "://") { + return ep + } + return url.Host +} diff --git a/registry/rpc/rpcregistry.go b/registry/rpc/rpcregistry.go index c75eb0496..9c14ec79e 100644 --- a/registry/rpc/rpcregistry.go +++ b/registry/rpc/rpcregistry.go @@ -32,15 +32,6 @@ import ( "github.com/coreos/fleet/unit" ) -const ( - grpcConnectionTimeout = 5000 * time.Millisecond - - grpcConnectionStateReady = "READY" - grpcConnectionStateConnecting = "CONNECTING" - grpcConnectionStateShutdown = "SHUTDOWN" - grpcConnectionStateFailure = "TRANSIENT_FAILURE" -) - var DebugRPCRegistry bool = false type RPCRegistry struct { @@ -48,6 +39,7 @@ type RPCRegistry struct { mu *sync.Mutex registryClient pb.RegistryClient registryConn *grpc.ClientConn + balancer *simpleBalancer } func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry { @@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context { } func (r *RPCRegistry) getClient() pb.RegistryClient { - for { - st, err := r.registryConn.State() - if err != nil { - log.Fatalf("Unable to get the state of rpc connection: %v", err) - } - state := st.String() - if state == grpcConnectionStateReady { - break - } else if state == grpcConnectionStateConnecting { - if DebugRPCRegistry { - log.Infof("gRPC connection state: %s", state) - } - continue - } else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown { - log.Infof("gRPC connection state '%s' reports an error in the connection", state) - log.Info("Reconnecting gRPC peer to fleet-engine...") - r.Connect() - } - - time.Sleep(grpcConnectionTimeout) - } - return r.registryClient } func (r *RPCRegistry) Connect() { // We want the connection operation to block and constantly reconnect using grpc backoff log.Info("Starting gRPC connection to fleet-engine...") - connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock()) + ep_engines := []string{":fleet-engine:"} + r.balancer = newSimpleBalancer(ep_engines) + connection, err := grpc.Dial(ep_engines[0], + grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), + grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer)) if err != nil { log.Fatalf("Unable to dial to registry: %s", err) } @@ -105,26 +79,16 @@ func (r *RPCRegistry) Close() { } func (r *RPCRegistry) IsRegistryReady() bool { - if r.registryConn != nil { - st, err := r.registryConn.State() - if err != nil { - log.Fatalf("Unable to get the state of rpc connection: %v", err) - } - connState := st.String() - log.Infof("Registry connection state: %s", connState) - if connState != grpcConnectionStateReady { - log.Errorf("unable to connect to registry connection state: %s", connState) - return false - } - log.Infof("Getting server status...") - status, err := r.Status() - if err != nil { - log.Errorf("unable to get the status of the registry service %v", err) - return false - } - log.Infof("Status of rpc service: %d, connection state: %s", status, connState) - return status == pb.HealthCheckResponse_SERVING && err == nil - } + // NOTE: checking for readyc doesn't work as expected. + // As a workaround, return false to get the connection to be reconnected. + // - dpark 20160826 + + // hasConn := false + // select { + // case <-r.balancer.readyc: + // hasConn = true + // } + // return hasConn return false } diff --git a/registry/rpc/rpcregistry_test.go b/registry/rpc/rpcregistry_test.go index 212bc13f3..410bbff1d 100644 --- a/registry/rpc/rpcregistry_test.go +++ b/registry/rpc/rpcregistry_test.go @@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) { t.Fatalf("failed to parse listener address: %v", err) } addr := "localhost:" + port - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock()) + b := newSimpleBalancer([]string{addr}) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), + grpc.WithBlock(), grpc.WithBalancer(b)) if err != nil { t.Fatalf("failed to dial to the server %q: %v", addr, err) }