Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

registry/rpc: use simpleBalancer instead of ClientConn.State() #1673

Merged
merged 3 commits into from Nov 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion engine/rpcengine.go
Expand Up @@ -110,7 +110,12 @@ func rpcAcquireLeadership(reg registry.Registry, lManager lease.Manager, machID
return l
}

if existing != nil && existing.Version() >= ver {
// If reg is not ready, we have to give it an opportunity to steal lease
// below. Otherwise it could be blocked forever by the existing engine leader,
// which could cause gRPC registry to always fail when a leader already exists.
// Thus we return the existing leader, only if reg.IsRegistryReady() == true.
// TODO(dpark): refactor the entire function for better readability. - 20160908
if (existing != nil && existing.Version() >= ver) && reg.IsRegistryReady() {
log.Debugf("Lease already held by Machine(%s) operating at acceptable version %d", existing.MachineID(), existing.Version())
return existing
}
Expand Down
7 changes: 4 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Expand Up @@ -35,3 +35,4 @@ import:
- package: google.golang.org/api
subpackages:
- googleapi
- package: github.com/spf13/viper
2,247 changes: 699 additions & 1,548 deletions protobuf/fleet.pb.go

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions registry/rpc/balancer.go
@@ -0,0 +1,80 @@
// Copyright 2016 The fleet Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpc

import (
"sync"
"sync/atomic"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

// simpleBalancer implements grpc.Balancer interface, being as simple as possible.
// to be used only for fleet.
//
// In principle grpc.Balancer is meant to be handling load balancer across
// multiple connections via addresses for RPCs.
// * Start() does initialization work to bootstrap a Balancer.
// * Up() informs the Balancer that gRPC has a connection to the server at addr.
// It returns Down() which is called once the connection to addr gets lost
// or closed.
// * Get() gets the address of a server for the RPC corresponding to ctx.
// * Notify() returns a channel that is used by gRPC internals to watch the
// addresses gRPC needs to connect.
// * Close() shuts down the balancer.
//
// However, as fleet needs to care only about a single connection, simpleBalancer
// in fleet should be kept as simple as possible. Most crucially simpleBalancer
// provides a simple channel, readyc, to notify the rpcRegistry of the connection
// being available. readyc gets closed in Up(), which will cause, for example,
// IsRegistryReady() to recognize that the connection is available. We don't need
// to care about which value the readyc has.
type simpleBalancer struct {
addrs []string
numGets uint32

// readyc closes once the first connection is up
readyc chan struct{}
readyOnce sync.Once
}

func newSimpleBalancer(eps []string) *simpleBalancer {
return &simpleBalancer{
addrs: eps,
readyc: make(chan struct{}),
}
}

func (b *simpleBalancer) Start(target string) error { return nil }

func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(error) {}
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
v := atomic.AddUint32(&b.numGets, 1)
addr := b.addrs[v%uint32(len(b.addrs))]

return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return nil }

func (b *simpleBalancer) Close() error {
b.readyc = make(chan struct{})
return nil
}
60 changes: 16 additions & 44 deletions registry/rpc/rpcregistry.go
Expand Up @@ -32,22 +32,14 @@ import (
"github.com/coreos/fleet/unit"
)

const (
grpcConnectionTimeout = 5000 * time.Millisecond

grpcConnectionStateReady = "READY"
grpcConnectionStateConnecting = "CONNECTING"
grpcConnectionStateShutdown = "SHUTDOWN"
grpcConnectionStateFailure = "TRANSIENT_FAILURE"
)

var DebugRPCRegistry bool = false

type RPCRegistry struct {
dialer func(addr string, timeout time.Duration) (net.Conn, error)
mu *sync.Mutex
registryClient pb.RegistryClient
registryConn *grpc.ClientConn
balancer *simpleBalancer
}

func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry {
Expand All @@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context {
}

func (r *RPCRegistry) getClient() pb.RegistryClient {
for {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
state := st.String()
if state == grpcConnectionStateReady {
break
} else if state == grpcConnectionStateConnecting {
if DebugRPCRegistry {
log.Infof("gRPC connection state: %s", state)
}
continue
} else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown {
log.Infof("gRPC connection state '%s' reports an error in the connection", state)
log.Info("Reconnecting gRPC peer to fleet-engine...")
r.Connect()
}

time.Sleep(grpcConnectionTimeout)
}

return r.registryClient
}

func (r *RPCRegistry) Connect() {
// We want the connection operation to block and constantly reconnect using grpc backoff
log.Info("Starting gRPC connection to fleet-engine...")
connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock())
ep_engines := []string{":fleet-engine:"}
r.balancer = newSimpleBalancer(ep_engines)
connection, err := grpc.Dial(ep_engines[0],
grpc.WithTimeout(12*time.Second), grpc.WithInsecure(),
grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer))
if err != nil {
log.Fatalf("Unable to dial to registry: %s", err)
}
Expand All @@ -106,24 +80,22 @@ func (r *RPCRegistry) Close() {

func (r *RPCRegistry) IsRegistryReady() bool {
if r.registryConn != nil {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
connState := st.String()
log.Infof("Registry connection state: %s", connState)
if connState != grpcConnectionStateReady {
log.Errorf("unable to connect to registry connection state: %s", connState)
return false
hasConn := false
if r.balancer != nil {
select {
case <-r.balancer.readyc:
hasConn = true
}
}
log.Infof("Getting server status...")

status, err := r.Status()
if err != nil {
log.Errorf("unable to get the status of the registry service %v", err)
return false
}
log.Infof("Status of rpc service: %d, connection state: %s", status, connState)
return status == pb.HealthCheckResponse_SERVING && err == nil
log.Infof("Status of rpc service: %d, balancer has a connection: %t", status, hasConn)

return hasConn && status == pb.HealthCheckResponse_SERVING && err == nil
}
return false
}
Expand Down
4 changes: 3 additions & 1 deletion registry/rpc/rpcregistry_test.go
Expand Up @@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) {
t.Fatalf("failed to parse listener address: %v", err)
}
addr := "localhost:" + port
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock())
b := newSimpleBalancer([]string{addr})
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second),
grpc.WithBlock(), grpc.WithBalancer(b))
if err != nil {
t.Fatalf("failed to dial to the server %q: %v", addr, err)
}
Expand Down
2 changes: 0 additions & 2 deletions vendor/github.com/coreos/go-systemd/activation/listeners.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.