Skip to content

Commit

Permalink
[roundrobin_strips_attributes] roundrobin test
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Nov 19, 2020
1 parent 621e9e2 commit 8227351
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 5 deletions.
2 changes: 0 additions & 2 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
//
// TODO: delete this when this balancer reads attributes.
sc.UpdateAddresses([]resolver.Address{a})
}
}
Expand Down
82 changes: 79 additions & 3 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)

const (
testMDKey = "test-md"
)

type s struct {
grpctest.Tester
}
Expand All @@ -49,9 +55,23 @@ func Test(t *testing.T) {

type testServer struct {
testpb.UnimplementedTestServiceServer

testMDChan chan []string
}

func newTestServer() *testServer {
return &testServer{testMDChan: make(chan []string, 1)}
}

func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok && len(md[testMDKey]) != 0 {
select {
case s.testMDChan <- md[testMDKey]:
case <-ctx.Done():
return nil, ctx.Err()
}
}
return &testpb.Empty{}, nil
}

Expand All @@ -60,8 +80,9 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ
}

type test struct {
servers []*grpc.Server
addresses []string
servers []*grpc.Server
serverImpls []*testServer
addresses []string
}

func (t *test) cleanup() {
Expand All @@ -85,8 +106,10 @@ func startTestServers(count int) (_ *test, err error) {
}

s := grpc.NewServer()
testpb.RegisterTestServiceServer(s, &testServer{})
sImpl := newTestServer()
testpb.RegisterTestServiceServer(s, sImpl)
t.servers = append(t.servers, s)
t.serverImpls = append(t.serverImpls, sImpl)
t.addresses = append(t.addresses, lis.Addr().String())

go func(s *grpc.Server, l net.Listener) {
Expand Down Expand Up @@ -473,3 +496,56 @@ func (s) TestAllServersDown(t *testing.T) {
}
t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
}

func (s) TestUpdateAddressAttributes(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}
// The first RPC should not set metadata, so there's no md in the channel.
select {
case md1 := <-test.serverImpls[0].testMDChan:
if len(md1) != 0 {
t.Fatalf("got md: %v, want empty metadata", md1)
}
case <-time.After(time.Microsecond * 100):
}

const testMDValue = "test-md-value"
// Update metadata in address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{
imetadata.Set(resolver.Address{Addr: test.addresses[0]}, metadata.Pairs(testMDKey, testMDValue)),
}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}

// The second RPC should send metadata with it.
md2 := <-test.serverImpls[0].testMDChan
if len(md2) == 0 || md2[0] != testMDValue {
t.Fatalf("got md: %v, want %v", md2, []string{testMDValue})
}
}

0 comments on commit 8227351

Please sign in to comment.