Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for recycling "leaf" connections #60

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 139 additions & 29 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,23 @@ func newBalancer(
picker func(prev picker.Picker, allConns conn.Conns) picker.Picker,
checker health.Checker,
pool connPool,
roundTripperMaxLifetime time.Duration,
) *balancer {
ctx, cancel := context.WithCancel(ctx)
balancer := &balancer{
ctx: ctx,
cancel: cancel,
pool: pool,
newPicker: picker,
healthChecker: checker,
resolverUpdates: make(chan struct{}, 1),
closed: make(chan struct{}),
connInfo: map[conn.Conn]connInfo{},
clock: internal.NewRealClock(),
ctx: ctx,
cancel: cancel,
pool: pool,
newPicker: picker,
healthChecker: checker,
roundTripperMaxLifetime: roundTripperMaxLifetime,
resolverUpdates: make(chan struct{}, 1),
recycleConns: make(chan struct{}, 1),
closed: make(chan struct{}),
connInfo: map[conn.Conn]connInfo{},
clock: internal.NewRealClock(),
}
balancer.connManager.updateFunc = balancer.updateConns
return balancer
}

Expand All @@ -81,6 +85,8 @@ type balancer struct {
healthChecker health.Checker // +checklocksignore: mu is not required, but happens to always be held.
connManager connManager

roundTripperMaxLifetime time.Duration // +checklocksignore: mu is not required, but happens to always be held.

// NB: only set from tests
updateHook func([]resolver.Address, []conn.Conn)

Expand All @@ -93,6 +99,7 @@ type balancer struct {
latestAddrs atomic.Pointer[[]resolver.Address]
latestErr atomic.Pointer[error]
resolverUpdates chan struct{}
recycleConns chan struct{}
clock internal.Clock

mu sync.Mutex
Expand All @@ -106,6 +113,8 @@ type balancer struct {
connInfo map[conn.Conn]connInfo
// +checklocks:mu
reresolveLastCall time.Time
// +checklocks:mu
connsToRecycle []conn.Conn
}

func (b *balancer) UpdateHealthState(connection conn.Conn, state health.State) {
Expand Down Expand Up @@ -195,7 +204,7 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
for key, info := range b.connInfo {
delete(b.connInfo, key)
closer := info.closeChecker
info.cancelWarm()
info.cancel()
if closer != nil {
grp.Go(doClose(closer))
}
Expand All @@ -213,6 +222,25 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-b.recycleConns:
b.mu.Lock()
connsToRecycle := b.connsToRecycle
b.connsToRecycle = nil
b.mu.Unlock()
if len(connsToRecycle) > 0 {
// TODO: In practice, this will often recycle all connections at the same time
// since they are often created at the same time (when we get results
// from the resolver) and they all have the same max lifetime.
// This should be fine in vast majority of cases, but it would pose
// resource-usage issues if, for example, there were 1000s of TLS
// connections (or even 100s of TLS connections that use client certs and
// a computationally expensive private key algorithm like RSA). We might
// consider rate-limiting here the rate at which we recycle connections,
// allowing connections to live a little beyond their max lifetime just
// so we don't recycle everything all at once.
b.connManager.recycleConns(connsToRecycle)
}

case <-b.resolverUpdates:
addrs := b.latestAddrs.Load()
if addrs == nil {
Expand Down Expand Up @@ -242,7 +270,7 @@ func (b *balancer) receiveAddrs(ctx context.Context) {
if len(*addrs) > 0 {
addrsClone := make([]resolver.Address, len(*addrs))
copy(addrsClone, *addrs)
b.connManager.reconcileAddresses(addrsClone, b.updateConns)
b.connManager.reconcileAddresses(addrsClone)
}
}
}
Expand Down Expand Up @@ -282,7 +310,7 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
// and omit it from newConns
info := b.connInfo[existing]
delete(b.connInfo, existing)
info.cancelWarm()
info.cancel()
if info.closeChecker != nil {
_ = info.closeChecker.Close()
}
Expand All @@ -291,8 +319,16 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
newConns = append(newConns, existing)
}
newConns = append(newConns, addConns...)
for i := range addConns {
connection := addConns[i]
b.initConnInfoLocked(addConns)
b.conns = newConns
b.newPickerLocked()
return addConns
}

// +checklocks:b.mu
func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
for i := range conns {
connection := conns[i]
connCtx, connCancel := context.WithCancel(b.ctx)
healthChecker := b.healthChecker.New(connCtx, connection, b)
go func() {
Expand All @@ -301,11 +337,18 @@ func (b *balancer) updateConns(newAddrs []resolver.Address, removeConns []conn.C
b.warmedUp(connection)
}
}()
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancelWarm: connCancel}
cancel := connCancel
if b.roundTripperMaxLifetime != 0 {
timer := b.clock.AfterFunc(b.roundTripperMaxLifetime, func() {
b.recycle(connection)
})
cancel = func() {
connCancel()
timer.Stop()
}
}
b.connInfo[connection] = connInfo{closeChecker: healthChecker, cancel: cancel}
}
b.conns = newConns
b.newPickerLocked()
return addConns
}

// +checklocks:b.mu
Expand Down Expand Up @@ -392,19 +435,36 @@ func (b *balancer) setErrorPickerLocked(err error) {
b.pool.UpdatePicker(picker.ErrorPicker(err), false)
}

func (b *balancer) recycle(c conn.Conn) {
b.mu.Lock()
defer b.mu.Unlock()
b.connsToRecycle = append(b.connsToRecycle, c)
// Notify goroutine that there is a connection to recycle.
select {
case b.recycleConns <- struct{}{}:
default:
}
}

type connInfo struct {
state health.State
warm bool
cancelWarm context.CancelFunc
state health.State
warm bool

// Cancels any in-progress warm-up and also cancels any timer
// for recycling the connection. Invoked when the connection
// is closed.
cancel context.CancelFunc
closeChecker io.Closer
}

type connManager struct {
// only modified by a single goroutine, so mu is not necessary
// only used by a single goroutine, so no mutex necessary
connsByAddr map[string][]conn.Conn

updateFunc func([]resolver.Address, []conn.Conn) []conn.Conn
}

func (c *connManager) reconcileAddresses(addrs []resolver.Address, updateFunc func([]resolver.Address, []conn.Conn) []conn.Conn) {
func (c *connManager) reconcileAddresses(addrs []resolver.Address) {
// TODO: future extension: make connection establishing strategy configurable
// (which would allow more sophisticated connection strategies in the face
// of, for example, layer-4 load balancers)
Expand Down Expand Up @@ -446,13 +506,63 @@ func (c *connManager) reconcileAddresses(addrs []resolver.Address, updateFunc fu
newAddrs = append(newAddrs, want...)
}

c.connsByAddr = remaining
c.doUpdate(newAddrs, toRemove)
}

func (c *connManager) doUpdate(newAddrs []resolver.Address, toRemove []conn.Conn) {
// we make a single call to update connections in batch to create a single
// new picker (avoids potential picker churn from making one change at a time)
newConns := updateFunc(newAddrs, toRemove)
// add newConns to remaining to compute new set of connections
for _, c := range newConns {
hostPort := c.Address().HostPort
remaining[hostPort] = append(remaining[hostPort], c)
newConns := c.updateFunc(newAddrs, toRemove)
// add newConns to set of connections
for _, cn := range newConns {
hostPort := cn.Address().HostPort
c.connsByAddr[hostPort] = append(c.connsByAddr[hostPort], cn)
}
c.connsByAddr = remaining
}

func (c *connManager) recycleConns(connsToRecycle []conn.Conn) {
var needToCompact bool
for i, cn := range connsToRecycle {
addr := cn.Address().HostPort
existing := c.connsByAddr[addr]
var found bool
for i, existingConn := range existing {
if existingConn == cn {
found = true
// remove cn from the slice
copy(existing[i:], existing[i+1:])
existing[len(existing)-1] = nil // don't leak memory
c.connsByAddr[addr] = existing[:len(existing)-1]
jhump marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if !found {
// this connection has already been closed/removed
connsToRecycle[i] = nil
needToCompact = true
}
}
if needToCompact {
// TODO: when we can rely on Go 1.21, we should change this
// to use new std-lib function slices.DeleteFunc
i := 0
for _, cn := range connsToRecycle {
if cn != nil {
connsToRecycle[i] = cn
i++
}
}
if i == 0 {
// nothing to actually recycle
return
}
connsToRecycle = connsToRecycle[:i]
jhump marked this conversation as resolved.
Show resolved Hide resolved
}
newAddrs := make([]resolver.Address, len(connsToRecycle))
for i := range connsToRecycle {
newAddrs[i] = connsToRecycle[i].Address()
}

c.doUpdate(newAddrs, connsToRecycle)
}
Loading
Loading