Skip to content

Commit

Permalink
review comments: assign channelz IDs even when channelz is turned OFF
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Feb 17, 2022
1 parent 3e5c516 commit 786e6be
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 59 deletions.
11 changes: 9 additions & 2 deletions call_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -123,12 +124,16 @@ type server struct {
startedErr chan error // sent nil or an error after server starts
mu sync.Mutex
conns map[transport.ServerTransport]bool
channelzID *channelz.Identifier
}

type ctxKey string

func newTestServer() *server {
return &server{startedErr: make(chan error, 1)}
return &server{
startedErr: make(chan error, 1),
channelzID: channelz.NewIdentifierForTesting(channelz.RefServer, time.Now().Unix(), nil),
}
}

// start starts server. Other goroutines should block on s.startedErr for further operations.
Expand Down Expand Up @@ -158,10 +163,12 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32) {
return
}
config := &transport.ServerConfig{
MaxStreams: maxStreams,
MaxStreams: maxStreams,
ChannelzParentID: s.channelzID,
}
st, err := transport.NewServerTransport(conn, config)
if err != nil {
t.Errorf("failed to create server transport: %v", err)
continue
}
s.mu.Lock()
Expand Down
4 changes: 1 addition & 3 deletions clientconn.go
Expand Up @@ -1314,9 +1314,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne

connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
copts.ChannelzParentID = ac.channelzID

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
if err != nil {
Expand Down
46 changes: 23 additions & 23 deletions internal/channelz/funcs.go
Expand Up @@ -191,12 +191,8 @@ func GetServer(id int64) *ServerMetric {
//
// Returns a unique channelz identifier assigned to this channel.
//
// If channelz is not turned ON, this function is a no-op.
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
if !IsOn() {
return nil
}

id := idGen.genID()
parent := int64(0)
isTopChannel := true
Expand All @@ -205,6 +201,10 @@ func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
parent = pid.Int()
}

if !IsOn() {
return newIdentifer(RefChannel, id, pid)
}

cn := &channel{
refName: ref,
c: c,
Expand All @@ -224,16 +224,16 @@ func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
//
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, this function is a no-op.
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
if !IsOn() {
return nil, nil
}

if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
id := idGen.genID()
if !IsOn() {
return newIdentifer(RefSubChannel, id, pid), nil
}

sc := &subChannel{
refName: ref,
c: c,
Expand All @@ -249,13 +249,13 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
// RegisterServer registers the given server s in channelz database. It returns
// the unique channelz tracking id assigned to this server.
//
// If channelz is not turned ON, this function is a no-op.
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterServer(s Server, ref string) *Identifier {
id := idGen.genID()
if !IsOn() {
return nil
return newIdentifer(RefServer, id, nil)
}

id := idGen.genID()
svr := &server{
refName: ref,
s: s,
Expand All @@ -272,16 +272,16 @@ func RegisterServer(s Server, ref string) *Identifier {
// (identified by pid). It returns the unique channelz tracking id assigned to
// this listen socket.
//
// If channelz is not turned ON, this function is a no-op.
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
if !IsOn() {
return nil, nil
}

if pid == nil {
return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := idGen.genID()
if !IsOn() {
return newIdentifer(RefListenSocket, id, pid), nil
}

ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
db.get().addListenSocket(id, ls, pid.Int())
return newIdentifer(RefListenSocket, id, pid), nil
Expand All @@ -292,16 +292,16 @@ func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
// (identified by pid). It returns the unique channelz tracking id assigned to
// this normal socket.
//
// If channelz is not turned ON, this function is a no-op.
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
if !IsOn() {
return nil, nil
}

if pid == nil {
return nil, errors.New("a NormalSocket's parent id cannot be 0")
}
id := idGen.genID()
if !IsOn() {
return newIdentifer(RefNormalSocket, id, pid), nil
}

ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
db.get().addNormalSocket(id, ns, pid.Int())
return newIdentifer(RefNormalSocket, id, pid), nil
Expand Down
39 changes: 26 additions & 13 deletions internal/transport/keepalive_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"golang.org/x/net/http2"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/keepalive"
)
Expand Down Expand Up @@ -252,11 +253,15 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
// logic is running even without any active streams.
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
}}, connCh)
copts := ConnectOptions{
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
},
}
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))

Expand Down Expand Up @@ -284,10 +289,14 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
// active streams, and therefore the transport stays open.
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}}, connCh)
copts := ConnectOptions{
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
},
}
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))

Expand All @@ -313,10 +322,14 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
// transport even when there is an active stream.
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}}, connCh)
copts := ConnectOptions{
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
},
}
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))

Expand Down
60 changes: 42 additions & 18 deletions internal/transport/transport_test.go
Expand Up @@ -40,6 +40,7 @@ import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/testutils"
Expand All @@ -55,16 +56,6 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

type server struct {
lis net.Listener
port string
startedErr chan error // error (or nil) with server start value
mu sync.Mutex
conns map[ServerTransport]bool
h *testStreamHandler
ready chan struct{}
}

var (
expectedRequest = []byte("ping")
expectedResponse = []byte("pong")
Expand Down Expand Up @@ -298,6 +289,25 @@ func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
}
}

type server struct {
lis net.Listener
port string
startedErr chan error // error (or nil) with server start value
mu sync.Mutex
conns map[ServerTransport]bool
h *testStreamHandler
ready chan struct{}
channelzID *channelz.Identifier
}

func newTestServer() *server {
return &server{
startedErr: make(chan error, 1),
ready: make(chan struct{}),
channelzID: channelz.NewIdentifierForTesting(channelz.RefServer, time.Now().Unix(), nil),
}
}

// start starts server. Other goroutines should block on s.readyChan for further operations.
func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
var err error
Expand Down Expand Up @@ -421,9 +431,10 @@ func (s *server) addr() string {
return s.lis.Addr().String()
}

func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hType) *server {
server := &server{startedErr: make(chan error, 1), ready: make(chan struct{})}
go server.start(t, port, serverConfig, ht)
func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server {
server := newTestServer()
sc.ChannelzParentID = server.channelzID
go server.start(t, port, sc, ht)
server.wait(t, 2*time.Second)
return server
}
Expand All @@ -432,9 +443,11 @@ func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
}

func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, serverConfig, ht)
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, sc, ht)
addr := resolver.Address{Addr: "localhost:" + server.port}
copts.ChannelzParentID = channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)

connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func() {}, func(GoAwayReason) {}, func() {})
if connErr != nil {
Expand Down Expand Up @@ -1298,11 +1311,14 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
}()
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {})

copts := ConnectOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
defer ct.Close(fmt.Errorf("closed manually by test"))

str, err := ct.NewStream(connectCtx, &CallHdr{})
if err != nil {
t.Fatalf("Error while creating stream: %v", err)
Expand Down Expand Up @@ -2180,7 +2196,11 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
defer cancel()
creds := &attrTransportCreds{}

tr, err := NewClientTransport(ctx, context.Background(), addr, ConnectOptions{TransportCredentials: creds}, func() {}, func(GoAwayReason) {}, func() {})
copts := ConnectOptions{
TransportCredentials: creds,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down Expand Up @@ -2217,7 +2237,11 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
}

tr, err := NewClientTransport(ctx, context.Background(), addr, ConnectOptions{Dialer: dialer}, func() {}, func(GoAwayReason) {}, func() {})
copts := ConnectOptions{
Dialer: dialer,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
Expand Down

0 comments on commit 786e6be

Please sign in to comment.