Skip to content

Commit

Permalink
Update grpclb proto and move grpclb into package grpc (#1186)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Apr 25, 2017
1 parent 38df39b commit c73e016
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 248 deletions.
129 changes: 83 additions & 46 deletions grpclb/grpclb.go → grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
*
*/

// Package grpclb implements the load balancing protocol defined at
// https://github.com/grpc/grpc/blob/master/doc/load-balancing.md.
// The implementation is currently EXPERIMENTAL.
package grpclb
package grpc

import (
"errors"
Expand All @@ -45,14 +42,50 @@ import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/naming"
)

// Client API for LoadBalancer service.
// Mostly copied from generated pb.go file.
// To avoid circular dependency.
type loadBalancerClient struct {
cc *ClientConn
}

func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
desc := &StreamDesc{
StreamName: "BalanceLoad",
ServerStreams: true,
ClientStreams: true,
}
stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
if err != nil {
return nil, err
}
x := &balanceLoadClientStream{stream}
return x, nil
}

type balanceLoadClientStream struct {
ClientStream
}

func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}

func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
m := new(lbpb.LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

// AddressType indicates the address type returned by name resolution.
type AddressType uint8

Expand All @@ -63,18 +96,18 @@ const (
GRPCLB
)

// Metadata contains the information the name resolution for grpclb should provide. The
// AddrMetadataGRPCLB contains the information the name resolution for grpclb should provide. The
// name resolver used by grpclb balancer is required to provide this type of metadata in
// its address updates.
type Metadata struct {
type AddrMetadataGRPCLB struct {
// AddrType is the type of server (grpc load balancer or backend).
AddrType AddressType
// ServerName is the name of the grpc load balancer. Used for authentication.
ServerName string
}

// Balancer creates a grpclb load balancer.
func Balancer(r naming.Resolver) grpc.Balancer {
// NewGRPCLBBalancer creates a grpclb load balancer.
func NewGRPCLBBalancer(r naming.Resolver) Balancer {
return &balancer{
r: r,
}
Expand All @@ -86,13 +119,16 @@ type remoteBalancerInfo struct {
name string
}

// addrInfo consists of the information of a backend server.
type addrInfo struct {
addr grpc.Address
// grpclbAddrInfo consists of the information of a backend server.
type grpclbAddrInfo struct {
addr Address
connected bool
// dropRequest indicates whether a particular RPC which chooses this address
// should be dropped.
dropRequest bool
// dropForRateLimiting indicates whether this particular request should be
// dropped by the client for rate limiting.
dropForRateLimiting bool
// dropForLoadBalancing indicates whether this particular request should be
// dropped by the client for load balancing.
dropForLoadBalancing bool
}

type balancer struct {
Expand All @@ -101,9 +137,9 @@ type balancer struct {
mu sync.Mutex
seq int // a sequence number to make sure addrCh does not get stale addresses.
w naming.Watcher
addrCh chan []grpc.Address
addrCh chan []Address
rbs []remoteBalancerInfo
addrs []*addrInfo
addrs []*grpclbAddrInfo
next int
waitCh chan struct{}
done bool
Expand All @@ -119,7 +155,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
b.mu.Lock()
defer b.mu.Unlock()
if b.done {
return grpc.ErrClientConnClosing
return ErrClientConnClosing
}
for _, update := range updates {
switch update.Op {
Expand All @@ -135,7 +171,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
if exist {
continue
}
md, ok := update.Metadata.(*Metadata)
md, ok := update.Metadata.(*AddrMetadataGRPCLB)
if !ok {
// TODO: Revisit the handling here and may introduce some fallback mechanism.
grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata)
Expand Down Expand Up @@ -206,18 +242,19 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
servers := l.GetServers()
expiration := convertDuration(l.GetExpirationInterval())
var (
sl []*addrInfo
addrs []grpc.Address
sl []*grpclbAddrInfo
addrs []Address
)
for _, s := range servers {
md := metadata.Pairs("lb-token", s.LoadBalanceToken)
addr := grpc.Address{
addr := Address{
Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port),
Metadata: &md,
}
sl = append(sl, &addrInfo{
addr: addr,
dropRequest: s.DropRequest,
sl = append(sl, &grpclbAddrInfo{
addr: addr,
dropForRateLimiting: s.DropForRateLimiting,
dropForLoadBalancing: s.DropForLoadBalancing,
})
addrs = append(addrs, addr)
}
Expand All @@ -244,7 +281,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
return
}

func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (retry bool) {
func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbc.BalanceLoad(ctx)
Expand Down Expand Up @@ -306,7 +343,7 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret
return true
}

func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
func (b *balancer) Start(target string, config BalancerConfig) error {
b.rand = rand.New(rand.NewSource(time.Now().Unix()))
// TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil {
Expand All @@ -316,9 +353,9 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
b.mu.Lock()
if b.done {
b.mu.Unlock()
return grpc.ErrClientConnClosing
return ErrClientConnClosing
}
b.addrCh = make(chan []grpc.Address)
b.addrCh = make(chan []Address)
w, err := b.r.Resolve(target)
if err != nil {
b.mu.Unlock()
Expand All @@ -340,7 +377,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
// Spawn a goroutine to talk to the remote load balancer.
go func() {
var (
cc *grpc.ClientConn
cc *ClientConn
// ccError is closed when there is an error in the current cc.
// A new rb should be picked from rbs and connected.
ccError chan struct{}
Expand Down Expand Up @@ -419,15 +456,15 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
creds := config.DialCreds
ccError = make(chan struct{})
if creds == nil {
cc, err = grpc.Dial(rb.addr, grpc.WithInsecure())
cc, err = Dial(rb.addr, WithInsecure())
} else {
if rb.name != "" {
if err := creds.OverrideServerName(rb.name); err != nil {
grpclog.Printf("Failed to override the server name in the credentials: %v", err)
continue
}
}
cc, err = grpc.Dial(rb.addr, grpc.WithTransportCredentials(creds))
cc, err = Dial(rb.addr, WithTransportCredentials(creds))
}
if err != nil {
grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
Expand All @@ -439,8 +476,8 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
seq := b.seq
b.next = 0
b.mu.Unlock()
go func(cc *grpc.ClientConn, ccError chan struct{}) {
lbc := lbpb.NewLoadBalancerClient(cc)
go func(cc *ClientConn, ccError chan struct{}) {
lbc := &loadBalancerClient{cc}
b.callRemoteBalancer(lbc, seq)
cc.Close()
select {
Expand All @@ -454,7 +491,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
return nil
}

func (b *balancer) down(addr grpc.Address, err error) {
func (b *balancer) down(addr Address, err error) {
b.mu.Lock()
defer b.mu.Unlock()
for _, a := range b.addrs {
Expand All @@ -465,7 +502,7 @@ func (b *balancer) down(addr grpc.Address, err error) {
}
}

func (b *balancer) Up(addr grpc.Address) func(error) {
func (b *balancer) Up(addr Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.done {
Expand All @@ -479,7 +516,7 @@ func (b *balancer) Up(addr grpc.Address) func(error) {
}
a.connected = true
}
if a.connected && !a.dropRequest {
if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
cnt++
}
}
Expand All @@ -493,12 +530,12 @@ func (b *balancer) Up(addr grpc.Address) func(error) {
}
}

func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr grpc.Address, put func(), err error) {
func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
b.mu.Lock()
if b.done {
b.mu.Unlock()
err = grpc.ErrClientConnClosing
err = ErrClientConnClosing
return
}

Expand All @@ -511,7 +548,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
a := b.addrs[next]
next = (next + 1) % len(b.addrs)
if a.connected {
if !a.dropRequest {
if !a.dropForRateLimiting && !a.dropForLoadBalancing {
addr = a.addr
b.next = next
b.mu.Unlock()
Expand All @@ -520,7 +557,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
if !opts.BlockingWait {
b.next = next
b.mu.Unlock()
err = grpc.Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
return
}
}
Expand All @@ -533,7 +570,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
if !opts.BlockingWait {
if len(b.addrs) == 0 {
b.mu.Unlock()
err = grpc.Errorf(codes.Unavailable, "there is no address available")
err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on b.addrs for a failfast RPC.
Expand All @@ -559,7 +596,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
b.mu.Lock()
if b.done {
b.mu.Unlock()
err = grpc.ErrClientConnClosing
err = ErrClientConnClosing
return
}

Expand All @@ -572,7 +609,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
a := b.addrs[next]
next = (next + 1) % len(b.addrs)
if a.connected {
if !a.dropRequest {
if !a.dropForRateLimiting && !a.dropForLoadBalancing {
addr = a.addr
b.next = next
b.mu.Unlock()
Expand All @@ -581,7 +618,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
if !opts.BlockingWait {
b.next = next
b.mu.Unlock()
err = grpc.Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
return
}
}
Expand All @@ -603,7 +640,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
}
}

func (b *balancer) Notify() <-chan []grpc.Address {
func (b *balancer) Notify() <-chan []Address {
return b.addrCh
}

Expand Down

0 comments on commit c73e016

Please sign in to comment.