Skip to content
Permalink
Browse files

Change default config values and initial config logic. (#15)

* Change initial value for maxConn and maxStream

* Add more gcpInt tests

* Update docs

* Update docs

* minor fixes
  • Loading branch information...
WeiranFang committed Mar 30, 2019
1 parent a0862e6 commit e9c6cdb4ad676c0a2183f0854577936b86804d2e
@@ -20,6 +20,8 @@
Package grpcgcp provides grpc supports for Google Cloud APIs.
For now it provides connection management with affinity support.
Note: "channel" is analagous to "connection" in our context.
Usage:
1. First, initialize the api configuration. There are two ways:
@@ -105,7 +107,7 @@ Usage:
conn, err := grpc.Dial(
target,
// Following are channel management options:
// Register and specify the grpc-gcp load balancer.
grpc.WithBalancerName("grpc_gcp"),
grpc.WithUnaryInterceptor(gcpInt.GCPUnaryClientInterceptor),
grpc.WithStreamInterceptor(gcpInt.GCPStreamClientInterceptor),
@@ -33,10 +33,6 @@ const (
// Name is the name of grpc_gcp balancer.
Name = "grpc_gcp"

// Default settings for max pool size and max concurrent streams.
defaultMaxConn = 10
defaultMaxStream = 100

healthCheckEnabled = true
)

@@ -28,13 +28,27 @@ import (
"google.golang.org/grpc"
)

const (
// Default max number of connections is 0, meaning "no limit"
defaultMaxConn = 0

// Default max stream watermark is 100, which is the current stream limit for GFE.
// Any value >100 will be rounded down to 100.
defaultMaxStream = 100
)

type key int

var gcpKey key

type poolConfig struct {
maxConn uint32
maxStream uint32
}

type gcpContext struct {
affinityCfg *pb.AffinityConfig
channelPoolCfg *pb.ChannelPoolConfig
affinityCfg *pb.AffinityConfig
poolCfg *poolConfig
// request message used for pre-process of an affinity call
reqMsg interface{}
// response message used for post-process of an affinity call
@@ -44,7 +58,8 @@ type gcpContext struct {
// GCPInterceptor provides functions for intercepting client requests
// in order to support GCP specific features
type GCPInterceptor struct {
channelPoolCfg *pb.ChannelPoolConfig
poolCfg *poolConfig

// Maps method path to AffinityConfig
methodToAffinity map[string]*pb.AffinityConfig
}
@@ -62,8 +77,25 @@ func NewGCPInterceptor(config *pb.ApiConfig) *GCPInterceptor {
}
}
}

poolCfg := &poolConfig{
maxConn: defaultMaxConn,
maxStream: defaultMaxStream,
}

userPoolCfg := config.GetChannelPool()

// Set user defined MaxSize.
poolCfg.maxConn = userPoolCfg.GetMaxSize()

// Set user defined MaxConcurrentStreamsLowWatermark if ranged in [1, defaultMaxStream],
// otherwise use the defaultMaxStream.
watermarkValue := userPoolCfg.GetMaxConcurrentStreamsLowWatermark()
if watermarkValue >= 1 && watermarkValue <= defaultMaxStream {
poolCfg.maxStream = watermarkValue
}
return &GCPInterceptor{
channelPoolCfg: config.GetChannelPool(),
poolCfg: poolCfg,
methodToAffinity: mp,
}
}
@@ -81,10 +113,10 @@ func (gcpInt *GCPInterceptor) GCPUnaryClientInterceptor(
) error {
affinityCfg, _ := gcpInt.methodToAffinity[method]
gcpCtx := &gcpContext{
affinityCfg: affinityCfg,
reqMsg: req,
replyMsg: reply,
channelPoolCfg: gcpInt.channelPoolCfg,
affinityCfg: affinityCfg,
reqMsg: req,
replyMsg: reply,
poolCfg: gcpInt.poolCfg,
}
ctx = context.WithValue(ctx, gcpKey, gcpCtx)

@@ -139,9 +171,9 @@ func (cs *gcpClientStream) SendMsg(m interface{}) error {
ctx := cs.ctx
if ok {
gcpCtx := &gcpContext{
affinityCfg: affinityCfg,
reqMsg: m,
channelPoolCfg: cs.gcpInt.channelPoolCfg,
affinityCfg: affinityCfg,
reqMsg: m,
poolCfg: cs.gcpInt.poolCfg,
}
ctx = context.WithValue(cs.ctx, gcpKey, gcpCtx)
}
@@ -0,0 +1,148 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpcgcp

import (
"testing"

configpb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
"google.golang.org/grpc/balancer"
)

func TestInitApiConfig(t *testing.T) {
expectedSize := uint32(10)
expectedStreams := uint32(10)
apiConfig := &configpb.ApiConfig{
ChannelPool: &configpb.ChannelPoolConfig{
MaxSize: expectedSize,
MaxConcurrentStreamsLowWatermark: expectedStreams,
},
}
gcpInt := NewGCPInterceptor(apiConfig)
if gcpInt.poolCfg.maxConn != expectedSize {
t.Errorf("poolCfg has incorrect maxConn: %v, want: %v", gcpInt.poolCfg.maxConn, expectedSize)
}
if gcpInt.poolCfg.maxStream != expectedStreams {
t.Errorf("poolCfg has incorrect maxStream: %v, want: %v", gcpInt.poolCfg.maxStream, expectedStreams)
}
if len(gcpInt.methodToAffinity) != 0 {
t.Errorf("methodToAffinity has incorrect size: %v, want: %v", len(gcpInt.methodToAffinity), 0)
}
}

func TestDefaultApiConfig(t *testing.T) {
defaultSize := uint32(0)
defaultStreams := uint32(100)
apiConfig := &configpb.ApiConfig{
ChannelPool: &configpb.ChannelPoolConfig{},
}
gcpInt := NewGCPInterceptor(apiConfig)
if gcpInt.poolCfg.maxConn != defaultSize {
t.Errorf("poolCfg has incorrect maxConn: %v, want: %v", gcpInt.poolCfg.maxConn, defaultSize)
}
if gcpInt.poolCfg.maxStream != defaultStreams {
t.Errorf("poolCfg has incorrect maxStream: %v, want: %v", gcpInt.poolCfg.maxStream, defaultStreams)
}

apiConfig = &configpb.ApiConfig{
ChannelPool: &configpb.ChannelPoolConfig{
MaxConcurrentStreamsLowWatermark: 0,
},
}
gcpInt = NewGCPInterceptor(apiConfig)
if gcpInt.poolCfg.maxConn != defaultSize {
t.Errorf("poolCfg has incorrect maxConn: %v, want: %v", gcpInt.poolCfg.maxConn, defaultSize)
}
if gcpInt.poolCfg.maxStream != defaultStreams {
t.Errorf("poolCfg has incorrect maxStream: %v, want: %v", gcpInt.poolCfg.maxStream, defaultStreams)
}

apiConfig = &configpb.ApiConfig{
ChannelPool: &configpb.ChannelPoolConfig{
MaxConcurrentStreamsLowWatermark: 200,
},
}
gcpInt = NewGCPInterceptor(apiConfig)
if gcpInt.poolCfg.maxConn != defaultSize {
t.Errorf("poolCfg has incorrect maxConn: %v, want: %v", gcpInt.poolCfg.maxConn, defaultSize)
}
if gcpInt.poolCfg.maxStream != defaultStreams {
t.Errorf("poolCfg has incorrect maxStream: %v, want: %v", gcpInt.poolCfg.maxStream, defaultStreams)
}
}

func TestParseJsonApiConfig(t *testing.T) {
expectedSize := uint32(10)
expectedStreams := uint32(10)
apiConfig, err := ParseAPIConfig("test_config.json")
if err != nil {
t.Fatalf("Failed to parse api config file: %v", err)
}

// Register test builder wrapper
balancer.Register(&testBuilderWrapper{
name: Name,
realBuilder: &gcpBalancerBuilder{name: Name},
})

gcpInt := NewGCPInterceptor(apiConfig)

if gcpInt.poolCfg.maxConn != expectedSize {
t.Errorf("poolCfg has incorrect maxConn: %v, want: %v", gcpInt.poolCfg.maxConn, expectedSize)
}
if gcpInt.poolCfg.maxStream != expectedStreams {
t.Errorf("poolCfg has incorrect maxStream: %v, want: %v", gcpInt.poolCfg.maxStream, expectedStreams)
}

if gcpInt.methodToAffinity == nil {
t.Fatalf("gcpInt.methodToAffinity should not be nil")
}

expectedMethods := 3
if len(gcpInt.methodToAffinity) != expectedMethods {
t.Errorf("methodToAffinity has incorrect size: %v, want: %v", len(gcpInt.methodToAffinity), expectedMethods)
}

methodName := "method1"
affCfg, ok := gcpInt.methodToAffinity[methodName]
if !ok {
t.Fatalf("gcpInt.methodToAffinity should contain key: %v", methodName)
}
if affCfg.GetCommand() != configpb.AffinityConfig_BIND {
t.Errorf("affinity config has incorrect command: %v, want: %v", affCfg.GetCommand(), configpb.AffinityConfig_BIND)
}

methodName = "method2"
affCfg, ok = gcpInt.methodToAffinity[methodName]
if !ok {
t.Fatalf("gcpInt.methodToAffinity should contain key: %v", methodName)
}
if affCfg.GetCommand() != configpb.AffinityConfig_BOUND {
t.Errorf("affinity config has incorrect command: %v, want: %v", affCfg.GetCommand(), configpb.AffinityConfig_BOUND)
}

methodName = "method3"
affCfg, ok = gcpInt.methodToAffinity[methodName]
if !ok {
t.Fatalf("gcpInt.methodToAffinity should contain key: %v", methodName)
}
if affCfg.GetCommand() != configpb.AffinityConfig_UNBIND {
t.Errorf("affinity config has incorrect command: %v, want: %v", affCfg.GetCommand(), configpb.AffinityConfig_UNBIND)
}
}
@@ -34,16 +34,15 @@ func newGCPPicker(readySCRefs []*subConnRef, gb *gcpBalancer) balancer.Picker {
return &gcpPicker{
gcpBalancer: gb,
scRefs: readySCRefs,
poolCfg: nil,
}
}

type gcpPicker struct {
gcpBalancer *gcpBalancer

mu sync.Mutex
scRefs []*subConnRef
maxConn uint32
maxStream uint32
mu sync.Mutex
scRefs []*subConnRef
poolCfg *poolConfig
}

func (p *gcpPicker) Pick(
@@ -61,25 +60,11 @@ func (p *gcpPicker) Pick(
boundKey := ""

if hasGcpCtx {
affinity := gcpCtx.affinityCfg
channelPool := gcpCtx.channelPoolCfg
if channelPool != nil {
// Initialize p.maxConn and p.maxStream for the first time.
if p.maxConn == 0 {
if channelPool.GetMaxSize() == 0 {
p.maxConn = defaultMaxConn
} else {
p.maxConn = channelPool.GetMaxSize()
}
}
if p.maxStream == 0 {
if channelPool.GetMaxConcurrentStreamsLowWatermark() == 0 {
p.maxStream = defaultMaxStream
} else {
p.maxStream = channelPool.GetMaxConcurrentStreamsLowWatermark()
}
}
if p.poolCfg == nil {
// Initialize poolConfig for picker.
p.poolCfg = gcpCtx.poolCfg
}
affinity := gcpCtx.affinityCfg
if affinity != nil {
locator := affinity.GetAffinityKey()
cmd := affinity.GetCommand()
@@ -136,13 +121,13 @@ func (p *gcpPicker) getSubConnRef(boundKey string) (*subConnRef, error) {
})

// If the least busy connection still has capacity, use it
if len(p.scRefs) > 0 && p.scRefs[0].streamsCnt < int32(p.maxStream) {
if len(p.scRefs) > 0 && p.scRefs[0].streamsCnt < int32(p.poolCfg.maxStream) {
return p.scRefs[0], nil
}

if p.gcpBalancer.getConnectionPoolSize() < int(p.maxConn) {
if p.poolCfg.maxConn == 0 || p.gcpBalancer.getConnectionPoolSize() < int(p.poolCfg.maxConn) {
// Ask balancer to create new subconn when all current subconns are busy and
// the number of subconns has not reached maximum.
// the connection pool still has capacity (either unlimited or maxSize is not reached).
p.gcpBalancer.newSubConn()

// Let this picker return ErrNoSubConnAvailable because it needs some time
@@ -23,7 +23,6 @@ import (
"fmt"
"testing"

"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mocks"
"github.com/golang/mock/gomock"
"google.golang.org/grpc/balancer"
@@ -141,9 +140,9 @@ func TestPickSubConnWithLeastStreams(t *testing.T) {

ctx := context.Background()
gcpCtx := &gcpContext{
channelPoolCfg: &grpc_gcp.ChannelPoolConfig{
MaxSize: 10,
MaxConcurrentStreamsLowWatermark: 100,
poolCfg: &poolConfig{
maxConn: 10,
maxStream: 100,
},
}
ctx = context.WithValue(ctx, gcpKey, gcpCtx)
@@ -188,9 +187,9 @@ func TestPickNewSubConn(t *testing.T) {

ctx := context.Background()
gcpCtx := &gcpContext{
channelPoolCfg: &grpc_gcp.ChannelPoolConfig{
MaxSize: 10,
MaxConcurrentStreamsLowWatermark: 100,
poolCfg: &poolConfig{
maxConn: 10,
maxStream: 100,
},
}
ctx = context.WithValue(ctx, gcpKey, gcpCtx)

0 comments on commit e9c6cdb

Please sign in to comment.
You can’t perform that action at this time.