diff --git a/registry/rpc/balancer.go b/registry/rpc/balancer.go new file mode 100644 index 000000000..a687ea332 --- /dev/null +++ b/registry/rpc/balancer.go @@ -0,0 +1,75 @@ +// Copyright 2016 The fleet Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpc + +import ( + "net/url" + "strings" + "sync" + "sync/atomic" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type simpleBalancer struct { + addrs []string + notifyCh chan []grpc.Address + numGets uint32 + + // readyc closes once the first connection is up + readyc chan struct{} + readyOnce sync.Once +} + +func newSimpleBalancer(eps []string) *simpleBalancer { + notifyCh := make(chan []grpc.Address, 1) + addrs := make([]grpc.Address, len(eps)) + for i, ep := range eps { + addrs[i].Addr = getHost(ep) + } + notifyCh <- addrs + return &simpleBalancer{ + addrs: eps, + notifyCh: notifyCh, + readyc: make(chan struct{}), + } +} + +func (b *simpleBalancer) Start(target string) error { return nil } +func (b *simpleBalancer) Up(addr grpc.Address) func(error) { + b.readyOnce.Do(func() { close(b.readyc) }) + return func(error) {} +} + +func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { + v := atomic.AddUint32(&b.numGets, 1) + addr := b.addrs[v%uint32(len(b.addrs))] + return grpc.Address{Addr: addr}, func() {}, nil +} + +func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } +func (b *simpleBalancer) Close() error { + close(b.notifyCh) + return nil +} + +func getHost(ep string) string { + url, uerr := url.Parse(ep) + if uerr != nil || !strings.Contains(ep, "://") { + return ep + } + return url.Host +} diff --git a/registry/rpc/rpcregistry.go b/registry/rpc/rpcregistry.go index c75eb0496..9c14ec79e 100644 --- a/registry/rpc/rpcregistry.go +++ b/registry/rpc/rpcregistry.go @@ -32,15 +32,6 @@ import ( "github.com/coreos/fleet/unit" ) -const ( - grpcConnectionTimeout = 5000 * time.Millisecond - - grpcConnectionStateReady = "READY" - grpcConnectionStateConnecting = "CONNECTING" - grpcConnectionStateShutdown = "SHUTDOWN" - grpcConnectionStateFailure = "TRANSIENT_FAILURE" -) - var DebugRPCRegistry bool = false type RPCRegistry struct { @@ -48,6 +39,7 @@ type RPCRegistry struct { mu *sync.Mutex registryClient pb.RegistryClient registryConn *grpc.ClientConn + balancer *simpleBalancer } func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry { @@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context { } func (r *RPCRegistry) getClient() pb.RegistryClient { - for { - st, err := r.registryConn.State() - if err != nil { - log.Fatalf("Unable to get the state of rpc connection: %v", err) - } - state := st.String() - if state == grpcConnectionStateReady { - break - } else if state == grpcConnectionStateConnecting { - if DebugRPCRegistry { - log.Infof("gRPC connection state: %s", state) - } - continue - } else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown { - log.Infof("gRPC connection state '%s' reports an error in the connection", state) - log.Info("Reconnecting gRPC peer to fleet-engine...") - r.Connect() - } - - time.Sleep(grpcConnectionTimeout) - } - return r.registryClient } func (r *RPCRegistry) Connect() { // We want the connection operation to block and constantly reconnect using grpc backoff log.Info("Starting gRPC connection to fleet-engine...") - connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock()) + ep_engines := []string{":fleet-engine:"} + r.balancer = newSimpleBalancer(ep_engines) + connection, err := grpc.Dial(ep_engines[0], + grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), + grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer)) if err != nil { log.Fatalf("Unable to dial to registry: %s", err) } @@ -105,26 +79,16 @@ func (r *RPCRegistry) Close() { } func (r *RPCRegistry) IsRegistryReady() bool { - if r.registryConn != nil { - st, err := r.registryConn.State() - if err != nil { - log.Fatalf("Unable to get the state of rpc connection: %v", err) - } - connState := st.String() - log.Infof("Registry connection state: %s", connState) - if connState != grpcConnectionStateReady { - log.Errorf("unable to connect to registry connection state: %s", connState) - return false - } - log.Infof("Getting server status...") - status, err := r.Status() - if err != nil { - log.Errorf("unable to get the status of the registry service %v", err) - return false - } - log.Infof("Status of rpc service: %d, connection state: %s", status, connState) - return status == pb.HealthCheckResponse_SERVING && err == nil - } + // NOTE: checking for readyc doesn't work as expected. + // As a workaround, return false to get the connection to be reconnected. + // - dpark 20160826 + + // hasConn := false + // select { + // case <-r.balancer.readyc: + // hasConn = true + // } + // return hasConn return false } diff --git a/registry/rpc/rpcregistry_test.go b/registry/rpc/rpcregistry_test.go index 212bc13f3..410bbff1d 100644 --- a/registry/rpc/rpcregistry_test.go +++ b/registry/rpc/rpcregistry_test.go @@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) { t.Fatalf("failed to parse listener address: %v", err) } addr := "localhost:" + port - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock()) + b := newSimpleBalancer([]string{addr}) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), + grpc.WithBlock(), grpc.WithBalancer(b)) if err != nil { t.Fatalf("failed to dial to the server %q: %v", addr, err) }