Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
registry/rpc: fix build errors from ClientConn.State()
Browse files Browse the repository at this point in the history
As ClientConn.State() of gRPC has disappeared, we need to avoid using
ClientConn.State(). Instead we should make use of gRPC rebalancer
mechanism, just like etcdv3 is doing. To do that, introduce
simpleBalancer, as a minimum structure to be used for grpc.Balancer.
  • Loading branch information
Dongsu Park committed Aug 26, 2016
1 parent a7ba357 commit af9b893
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 53 deletions.
75 changes: 75 additions & 0 deletions registry/rpc/balancer.go
@@ -0,0 +1,75 @@
// Copyright 2016 The fleet Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpc

import (
"net/url"
"strings"
"sync"
"sync/atomic"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

type simpleBalancer struct {
addrs []string
notifyCh chan []grpc.Address
numGets uint32

// readyc closes once the first connection is up
readyc chan struct{}
readyOnce sync.Once
}

func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
for i, ep := range eps {
addrs[i].Addr = getHost(ep)
}
notifyCh <- addrs
return &simpleBalancer{
addrs: eps,
notifyCh: notifyCh,
readyc: make(chan struct{}),
}
}

func (b *simpleBalancer) Start(target string) error { return nil }
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(error) {}
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
v := atomic.AddUint32(&b.numGets, 1)
addr := b.addrs[v%uint32(len(b.addrs))]
return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Close() error {
close(b.notifyCh)
return nil
}

func getHost(ep string) string {
url, uerr := url.Parse(ep)
if uerr != nil || !strings.Contains(ep, "://") {
return ep
}
return url.Host
}
68 changes: 16 additions & 52 deletions registry/rpc/rpcregistry.go
Expand Up @@ -32,22 +32,14 @@ import (
"github.com/coreos/fleet/unit"
)

const (
grpcConnectionTimeout = 5000 * time.Millisecond

grpcConnectionStateReady = "READY"
grpcConnectionStateConnecting = "CONNECTING"
grpcConnectionStateShutdown = "SHUTDOWN"
grpcConnectionStateFailure = "TRANSIENT_FAILURE"
)

var DebugRPCRegistry bool = false

type RPCRegistry struct {
dialer func(addr string, timeout time.Duration) (net.Conn, error)
mu *sync.Mutex
registryClient pb.RegistryClient
registryConn *grpc.ClientConn
balancer *simpleBalancer
}

func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry {
Expand All @@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context {
}

func (r *RPCRegistry) getClient() pb.RegistryClient {
for {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
state := st.String()
if state == grpcConnectionStateReady {
break
} else if state == grpcConnectionStateConnecting {
if DebugRPCRegistry {
log.Infof("gRPC connection state: %s", state)
}
continue
} else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown {
log.Infof("gRPC connection state '%s' reports an error in the connection", state)
log.Info("Reconnecting gRPC peer to fleet-engine...")
r.Connect()
}

time.Sleep(grpcConnectionTimeout)
}

return r.registryClient
}

func (r *RPCRegistry) Connect() {
// We want the connection operation to block and constantly reconnect using grpc backoff
log.Info("Starting gRPC connection to fleet-engine...")
connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock())
ep_engines := []string{":fleet-engine:"}
r.balancer = newSimpleBalancer(ep_engines)
connection, err := grpc.Dial(ep_engines[0],
grpc.WithTimeout(12*time.Second), grpc.WithInsecure(),
grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer))
if err != nil {
log.Fatalf("Unable to dial to registry: %s", err)
}
Expand All @@ -105,26 +79,16 @@ func (r *RPCRegistry) Close() {
}

func (r *RPCRegistry) IsRegistryReady() bool {
if r.registryConn != nil {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
connState := st.String()
log.Infof("Registry connection state: %s", connState)
if connState != grpcConnectionStateReady {
log.Errorf("unable to connect to registry connection state: %s", connState)
return false
}
log.Infof("Getting server status...")
status, err := r.Status()
if err != nil {
log.Errorf("unable to get the status of the registry service %v", err)
return false
}
log.Infof("Status of rpc service: %d, connection state: %s", status, connState)
return status == pb.HealthCheckResponse_SERVING && err == nil
}
// NOTE: checking for readyc doesn't work as expected.
// As a workaround, return false to get the connection to be reconnected.
// - dpark 20160826

// hasConn := false
// select {
// case <-r.balancer.readyc:
// hasConn = true
// }
// return hasConn
return false
}

Expand Down
4 changes: 3 additions & 1 deletion registry/rpc/rpcregistry_test.go
Expand Up @@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) {
t.Fatalf("failed to parse listener address: %v", err)
}
addr := "localhost:" + port
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock())
b := newSimpleBalancer([]string{addr})
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second),
grpc.WithBlock(), grpc.WithBalancer(b))
if err != nil {
t.Fatalf("failed to dial to the server %q: %v", addr, err)
}
Expand Down

0 comments on commit af9b893

Please sign in to comment.