Skip to content

Commit

Permalink
idle: move idleness manager to separate package and ~13s of tests int…
Browse files Browse the repository at this point in the history
…o it (#6566)
  • Loading branch information
dfawley committed Aug 23, 2023
1 parent 7d35b8e commit 81b9df2
Show file tree
Hide file tree
Showing 28 changed files with 416 additions and 500 deletions.
6 changes: 1 addition & 5 deletions balancer/rls/balancer_test.go
Expand Up @@ -1030,11 +1030,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
// the test would fail. Waiting for the channel to become READY here
// ensures that the test does not flake because of this rare sequence of
// events.
for s := cc.GetState(); s != connectivity.Ready; s = cc.GetState() {
if !cc.WaitForStateChange(ctx, s) {
t.Fatal("Timeout when waiting for connectivity state to reach READY")
}
}
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Cache the state changes seen up to this point.
states0 := ccWrapper.getStates()
Expand Down
4 changes: 2 additions & 2 deletions call.go
Expand Up @@ -27,10 +27,10 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.onCallEnd()
defer cc.idlenessMgr.OnCallEnd()

// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
Expand Down
2 changes: 0 additions & 2 deletions channelz/service/service_sktopt_test.go
Expand Up @@ -128,8 +128,6 @@ func protoToSocketOption(skopts []*channelzpb.SocketOption) *channelz.SocketOpti
}

func (s) TestGetSocketOptions(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
socketOptions: &channelz.SocketOptionData{
Expand Down
24 changes: 3 additions & 21 deletions channelz/service/service_test.go
Expand Up @@ -51,12 +51,6 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func cleanupWrapper(cleanup func() error, t *testing.T) {
if err := cleanup(); err != nil {
t.Error(err)
}
}

type protoToSocketOptFunc func([]*channelzpb.SocketOption) *channelz.SocketOptionData

// protoToSocketOpt is used in function socketProtoToStruct to extract socket option
Expand Down Expand Up @@ -311,8 +305,7 @@ func (s) TestGetTopChannels(t *testing.T) {
},
{},
}
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

for _, c := range tcs {
id := channelz.RegisterChannel(c, nil, "")
defer channelz.RemoveEntry(id)
Expand Down Expand Up @@ -364,8 +357,7 @@ func (s) TestGetServers(t *testing.T) {
lastCallStartedTimestamp: time.Now().UTC(),
},
}
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

for _, s := range ss {
id := channelz.RegisterServer(s, "")
defer channelz.RemoveEntry(id)
Expand Down Expand Up @@ -397,8 +389,6 @@ func (s) TestGetServers(t *testing.T) {
}

func (s) TestGetServerSockets(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
Expand Down Expand Up @@ -438,8 +428,6 @@ func (s) TestGetServerSockets(t *testing.T) {
// This test makes a GetServerSockets with a non-zero start ID, and expect only
// sockets with ID >= the given start ID.
func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
Expand Down Expand Up @@ -470,9 +458,6 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
}

func (s) TestGetChannel(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
Expand Down Expand Up @@ -584,8 +569,7 @@ func (s) TestGetSubChannel(t *testing.T) {
subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
)
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
Expand Down Expand Up @@ -662,8 +646,6 @@ func (s) TestGetSubChannel(t *testing.T) {
}

func (s) TestGetSocket(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
streamsStarted: 10,
Expand Down
17 changes: 14 additions & 3 deletions clientconn.go
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -266,7 +267,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Configure idleness support with configured idle timeout or default idle
// timeout duration. Idleness can be explicitly disabled by the user, by
// setting the dial option to 0.
cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})

// Return early for non-blocking dials.
if !cc.dopts.block {
Expand Down Expand Up @@ -317,6 +318,16 @@ func (cc *ClientConn) addTraceEvent(msg string) {
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
}

type idler ClientConn

func (i *idler) EnterIdleMode() error {
return (*ClientConn)(i).enterIdleMode()
}

func (i *idler) ExitIdleMode() error {
return (*ClientConn)(i).exitIdleMode()
}

// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer.
func (cc *ClientConn) exitIdleMode() error {
Expand Down Expand Up @@ -639,7 +650,7 @@ type ClientConn struct {
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
idlenessMgr idlenessManager
idlenessMgr idle.Manager

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand Down Expand Up @@ -1268,7 +1279,7 @@ func (cc *ClientConn) Close() error {
rWrapper.close()
}
if idlenessMgr != nil {
idlenessMgr.close()
idlenessMgr.Close()
}

for ac := range conns {
Expand Down
69 changes: 18 additions & 51 deletions internal/channelz/funcs.go
Expand Up @@ -24,9 +24,7 @@
package channelz

import (
"context"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
Expand All @@ -40,8 +38,11 @@ const (
)

var (
db dbWrapper
idGen idGenerator
// IDGen is the global channelz entity ID generator. It should not be used
// outside this package except by tests.
IDGen IDGenerator

db dbWrapper
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = int64(50)
curState int32
Expand All @@ -52,14 +53,14 @@ var (
func TurnOn() {
if !IsOn() {
db.set(newChannelMap())
idGen.reset()
IDGen.Reset()
atomic.StoreInt32(&curState, 1)
}
}

// IsOn returns whether channelz data collection is on.
func IsOn() bool {
return atomic.CompareAndSwapInt32(&curState, 1, 1)
return atomic.LoadInt32(&curState) == 1
}

// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
Expand Down Expand Up @@ -97,43 +98,6 @@ func (d *dbWrapper) get() *channelMap {
return d.DB
}

// NewChannelzStorageForTesting initializes channelz data storage and id
// generator for testing purposes.
//
// Returns a cleanup function to be invoked by the test, which waits for up to
// 10s for all channelz state to be reset by the grpc goroutines when those
// entities get closed. This cleanup function helps with ensuring that tests
// don't mess up each other.
func NewChannelzStorageForTesting() (cleanup func() error) {
db.set(newChannelMap())
idGen.reset()

return func() error {
cm := db.get()
if cm == nil {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
cm.mu.RLock()
topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
cm.mu.RUnlock()

if err := ctx.Err(); err != nil {
return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
}
if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
return nil
}
<-ticker.C
}
}
}

// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
// boolean indicating whether there's more top channels to be queried for.
//
Expand Down Expand Up @@ -193,7 +157,7 @@ func GetServer(id int64) *ServerMetric {
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
id := idGen.genID()
id := IDGen.genID()
var parent int64
isTopChannel := true
if pid != nil {
Expand Down Expand Up @@ -229,7 +193,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefSubChannel, id, pid), nil
}
Expand All @@ -251,7 +215,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterServer(s Server, ref string) *Identifier {
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefServer, id, nil)
}
Expand All @@ -277,7 +241,7 @@ func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
if pid == nil {
return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefListenSocket, id, pid), nil
}
Expand All @@ -297,7 +261,7 @@ func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
if pid == nil {
return nil, errors.New("a NormalSocket's parent id cannot be 0")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefNormalSocket, id, pid), nil
}
Expand Down Expand Up @@ -776,14 +740,17 @@ func (c *channelMap) GetServer(id int64) *ServerMetric {
return sm
}

type idGenerator struct {
// IDGenerator is an incrementing atomic that tracks IDs for channelz entities.
type IDGenerator struct {
id int64
}

func (i *idGenerator) reset() {
// Reset resets the generated ID back to zero. Should only be used at
// initialization or by tests sensitive to the ID number.
func (i *IDGenerator) Reset() {
atomic.StoreInt64(&i.id, 0)
}

func (i *idGenerator) genID() int64 {
func (i *IDGenerator) genID() int64 {
return atomic.AddInt64(&i.id, 1)
}
5 changes: 5 additions & 0 deletions internal/channelz/types.go
Expand Up @@ -628,6 +628,7 @@ type tracedChannel interface {

type channelTrace struct {
cm *channelMap
clearCalled bool
createdTime time.Time
eventCount int64
mu sync.Mutex
Expand Down Expand Up @@ -656,6 +657,10 @@ func (c *channelTrace) append(e *TraceEvent) {
}

func (c *channelTrace) clear() {
if c.clearCalled {
return
}
c.clearCalled = true
c.mu.Lock()
for _, e := range c.events {
if e.RefID != 0 {
Expand Down

0 comments on commit 81b9df2

Please sign in to comment.