Skip to content

Commit

Permalink
spanner: keep better track of max sessions
Browse files Browse the repository at this point in the history
The session pool did not keep proper track of the max number of sessions in use within one
maintenance window. This change ensures that the max number of sessions in use is actually kept
track of per window, and prevents the maintainer from deleting sessions too eagerly. This will
change the behavior of the session pool and lead to the pool keeping more sessions in the pool for
a longer time, and to less deletion and creation of sessions.

The logic is now:
1. Keep track of the max number of sessions in use when a session is taken from the pool, instead
of probing this only at the moment that the maintainer is running. This prevents the session pool
from missing spikes in session usage.
2. Store the max number of sessions during a health check cycle in a maintenance window struct
containing one value for each cycle in an entire window of 10 cycles.
3. Only delete sessions after at least one full maintenance window has passed. After that,
the maintainer will delete sessions based on a rolling set of maxSessionsInUse for the last
10 minutes.
4. The shrinkPool and replenishPool both take a max number of sessions to delete/create as an
argument in order to prevent the methods from deleting/creating more sessions than intended if the
number of sessions in the pool change as a result of other operations on the pool.
5. If the shrinkPool method notices that the session pool has started creating new sessions,
the method will stop deleting any further sessions.

Fixes #1303, #1382, #1398 and #1406.

Change-Id: I20c0821a45b8ce02cb09a9dc492d389da03cccab
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/40150
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Shanika Kuruppu <skuruppu@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
  • Loading branch information
olavloite committed Oct 28, 2019
1 parent 192e002 commit 8097922
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 54 deletions.
205 changes: 163 additions & 42 deletions spanner/session.go
Expand Up @@ -451,6 +451,10 @@ type sessionPool struct {
SessionPoolConfig
// hc is the health checker
hc *healthChecker

// mw is the maintenance window containing statistics for the max number of
// sessions checked out of the pool during the last 10 minutes.
mw *maintenanceWindow
}

// newSessionPool creates a new session pool.
Expand All @@ -463,6 +467,7 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool,
valid: true,
mayGetSession: make(chan struct{}),
SessionPoolConfig: config,
mw: newMaintenanceWindow(),
}
if config.HealthCheckWorkers == 0 {
// With 10 workers and assuming average latency of 5ms for
Expand Down Expand Up @@ -598,8 +603,8 @@ func errGetSessionTimeout() error {
return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session")
}

// shouldPrepareWrite returns true if we should prepare more sessions for write.
func (p *sessionPool) shouldPrepareWrite() bool {
// shouldPrepareWriteLocked returns true if we should prepare more sessions for write.
func (p *sessionPool) shouldPrepareWriteLocked() bool {
return float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs))
}

Expand Down Expand Up @@ -676,7 +681,9 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
}
if s != nil {
s.setIdleList(nil)
numCheckedOut := p.currSessionsCheckedOutLocked()
p.mu.Unlock()
p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
// From here, session is no longer in idle list, so healthcheck
// workers won't destroy it. If healthcheck workers failed to
// schedule healthcheck for the session timely, do the check here.
Expand Down Expand Up @@ -705,9 +712,13 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {

// Take budget before the actual session creation.
p.numOpened++
// Creating a new session that will be returned directly to the client
// means that the max number of sessions in use also increases.
numCheckedOut := p.currSessionsCheckedOutLocked()
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
p.createReqs++
p.mu.Unlock()
p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
if s, err = p.createSession(ctx); err != nil {
trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
return nil, toSpannerError(err)
Expand Down Expand Up @@ -745,7 +756,9 @@ func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, err
}
if s != nil {
s.setIdleList(nil)
numCheckedOut := p.currSessionsCheckedOutLocked()
p.mu.Unlock()
p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
// From here, session is no longer in idle list, so healthcheck
// workers won't destroy it. If healthcheck workers failed to
// schedule healthcheck for the session timely, do the check here.
Expand All @@ -772,9 +785,13 @@ func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, err

// Take budget before the actual session creation.
p.numOpened++
// Creating a new session that will be returned directly to the client
// means that the max number of sessions in use also increases.
numCheckedOut := p.currSessionsCheckedOutLocked()
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
p.createReqs++
p.mu.Unlock()
p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut)
if s, err = p.createSession(ctx); err != nil {
trace.TracePrintf(ctx, nil, "Error creating session: %v", err)
return nil, toSpannerError(err)
Expand Down Expand Up @@ -846,13 +863,17 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool {
return false
}

func (p *sessionPool) currSessionsCheckedOutLocked() uint64 {
return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len())
}

// hcHeap implements heap.Interface. It is used to create the priority queue for
// session healthchecks.
type hcHeap struct {
sessions []*session
}

// Len impelemnts heap.Interface.Len.
// Len implements heap.Interface.Len.
func (h hcHeap) Len() int {
return len(h.sessions)
}
Expand Down Expand Up @@ -886,6 +907,78 @@ func (h *hcHeap) Pop() interface{} {
return s
}

// maintenanceWindowSize specifies the number of health check cycles that
// defines a maintenance window. The maintenance window keeps track of a
// rolling set of numbers for the number of maximum checked out sessions during
// the maintenance window. This is used by the maintainer to determine the
// number of sessions to create or delete at the end of each health check
// cycle.
const maintenanceWindowSize = 10

// maintenanceWindow contains the statistics that are gathered during a health
// check maintenance window.
type maintenanceWindow struct {
mu sync.Mutex
// maxSessionsCheckedOut contains the maximum number of sessions that was
// checked out of the session pool during a health check cycle. This number
// indicates the number of sessions that was actually needed by the pool to
// serve the load during that cycle. The values are kept as a rolling set
// containing the values for the past 10 cycles (minutes). The maintainer
// uses these values to determine the number of sessions to keep at the end
// of each cycle.
maxSessionsCheckedOut [maintenanceWindowSize]uint64
}

// maxSessionsCheckedOutDuringWindow returns the maximum number of sessions
// that has been checked out during the last maintenance window of 10 cycles
// (minutes).
func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 {
mw.mu.Lock()
defer mw.mu.Unlock()
var max uint64
for _, cycleMax := range mw.maxSessionsCheckedOut {
max = maxUint64(max, cycleMax)
}
return max
}

// updateMaxSessionsCheckedOutDuringWindow updates the maximum number of
// sessions that has been checked out of the pool during the current
// cycle of the maintenance window. A maintenance window consists of 10
// maintenance cycles. Each cycle keeps track of the max number of sessions in
// use during that cycle. The rolling maintenance window of 10 cycles is used
// to determine the number of sessions to keep at the end of a cycle by
// calculating the max in use during the last 10 cycles.
func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) {
mw.mu.Lock()
defer mw.mu.Unlock()
mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0])
}

// startNewCycle starts a new health check cycle with the specified number of
// checked out sessions as its initial value.
func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) {
mw.mu.Lock()
defer mw.mu.Unlock()
copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9])
mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut
}

// newMaintenanceWindow creates a new maintenance window with all values for
// maxSessionsCheckedOut set to math.MaxUint64. This ensures that a complete
// maintenance window must pass before the maintainer will start to delete any
// sessions.
func newMaintenanceWindow() *maintenanceWindow {
mw := &maintenanceWindow{}
// Initialize the rolling window with max values to prevent the maintainer
// from deleting sessions before a complete window of 10 cycles has
// finished.
for i := 0; i < maintenanceWindowSize; i++ {
mw.maxSessionsCheckedOut[i] = math.MaxUint64
}
return mw
}

// healthChecker performs periodical healthchecks on registered sessions.
type healthChecker struct {
// mu protects concurrent access to healthChecker.
Expand Down Expand Up @@ -1051,7 +1144,7 @@ func (hc *healthChecker) worker(i int) {
getNextForTx := func() *session {
hc.pool.mu.Lock()
defer hc.pool.mu.Unlock()
if hc.pool.shouldPrepareWrite() {
if hc.pool.shouldPrepareWriteLocked() {
if hc.pool.idleList.Len() > 0 && hc.pool.valid {
hc.mu.Lock()
defer hc.mu.Unlock()
Expand Down Expand Up @@ -1110,66 +1203,72 @@ func (hc *healthChecker) worker(i int) {
}
}

// maintainer maintains the maxSessionsInUse by a window of
// kWindowSize * sampleInterval. Based on this information, health checker will
// try to maintain the number of sessions by hc.
// maintainer maintains the number of sessions in the pool based on the session
// pool configuration and the current and historical number of sessions checked
// out of the pool. The maintainer will:
// 1. Ensure that the session pool contains at least MinOpened sessions.
// 2. If the current number of sessions in the pool exceeds the greatest number
// of checked out sessions (=sessions in use) during the last 10 minutes,
// and the delta is larger than MaxIdleSessions, the maintainer will reduce
// the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions.
func (hc *healthChecker) maintainer() {
// Wait so that pool is ready.
// Wait until the pool is ready.
<-hc.ready

// A maintenance window is 10 iterations. The maintainer executes a loop
// every hc.sampleInterval, which defaults to 1 minute, which means that
// the default maintenance window is 10 minutes.
windowSize := uint64(10)

for iteration := uint64(0); ; iteration++ {
if hc.isClosing() {
hc.waitWorkers.Done()
return
}

// maxSessionsInUse is the maximum number of sessions in use
// concurrently over a period of time.
var maxSessionsInUse uint64

// Updates metrics.
hc.pool.mu.Lock()
currSessionsInUse := hc.pool.numOpened - uint64(hc.pool.idleList.Len()) - uint64(hc.pool.idleWriteList.Len())
currSessionsOpened := hc.pool.numOpened
maxIdle := hc.pool.MaxIdle
minOpened := hc.pool.MinOpened
hc.pool.mu.Unlock()

// Get the maximum number of sessions in use during the current
// maintenance window.
maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow()
hc.mu.Lock()
if iteration%windowSize == 0 || maxSessionsInUse < currSessionsInUse {
maxSessionsInUse = currSessionsInUse
}
sessionsToKeep := maxUint64(hc.pool.MinOpened,
minUint64(currSessionsOpened, hc.pool.MaxIdle+maxSessionsInUse))
ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval)
hc.maintainerCancel = cancel
hc.mu.Unlock()

// Replenish or Shrink pool if needed.
//
// Note: we don't need to worry about pending create session requests,
// we only need to sample the current sessions in use. The routines will
// not try to create extra / delete creating sessions.
if sessionsToKeep > currSessionsOpened {
hc.replenishPool(ctx, sessionsToKeep)
} else {
hc.shrinkPool(ctx, sessionsToKeep)
// Grow or shrink pool if needed.
// The number of sessions in the pool should be in the range
// [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow]
if currSessionsOpened < minOpened {
hc.growPool(ctx, minOpened)
} else if maxIdle+maxSessionsInUseDuringWindow > currSessionsOpened {
hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow)
}

select {
case <-ctx.Done():
case <-hc.done:
cancel()
}
// Cycle the maintenance window. This will remove the oldest cycle and
// add a new cycle at the beginning of the maintenance window with the
// currently checked out number of sessions as the max number of
// sessions in use in this cycle. This value will be increased during
// the next cycle if it increases.
hc.pool.mu.Lock()
currSessionsInUse := hc.pool.currSessionsCheckedOutLocked()
hc.pool.mu.Unlock()
hc.pool.mw.startNewCycle(currSessionsInUse)
}
}

// replenishPool is run if numOpened is less than sessionsToKeep, timeouts on
// sampleInterval.
func (hc *healthChecker) replenishPool(ctx context.Context, sessionsToKeep uint64) {
// growPool grows the number of sessions in the pool to the specified number of
// sessions. It timeouts on sampleInterval.
func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) {
// Calculate the max number of sessions to create as a safeguard against
// other processes that could be deleting sessions concurrently.
hc.pool.mu.Lock()
maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened)
hc.pool.mu.Unlock()
var created int
for {
if ctx.Err() != nil {
return
Expand All @@ -1178,14 +1277,14 @@ func (hc *healthChecker) replenishPool(ctx context.Context, sessionsToKeep uint6
p := hc.pool
p.mu.Lock()
// Take budget before the actual session creation.
if sessionsToKeep <= p.numOpened {
if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate {
p.mu.Unlock()
break
}
p.numOpened++
recordStat(ctx, OpenSessionCount, int64(p.numOpened))
p.createReqs++
shouldPrepareWrite := p.shouldPrepareWrite()
shouldPrepareWrite := p.shouldPrepareWriteLocked()
p.mu.Unlock()
var (
s *session
Expand All @@ -1198,6 +1297,7 @@ func (hc *healthChecker) replenishPool(ctx context.Context, sessionsToKeep uint6
continue
}
cancel()
created++
if shouldPrepareWrite {
prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute)
if err = s.prepareForWrite(prepareContext); err != nil {
Expand All @@ -1212,17 +1312,37 @@ func (hc *healthChecker) replenishPool(ctx context.Context, sessionsToKeep uint6
}
}

// shrinkPool, scales down the session pool.
func (hc *healthChecker) shrinkPool(ctx context.Context, sessionsToKeep uint64) {
// shrinkPool scales down the session pool. The method will stop deleting
// sessions when shrinkToNumSessions number of sessions in the pool has
// been reached. The method will also stop deleting sessions if it detects that
// another process has started creating sessions for the pool again, for
// example through the take() method.
func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) {
hc.pool.mu.Lock()
maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions)
hc.pool.mu.Unlock()
var deleted int
var prevNumOpened uint64 = math.MaxUint64
for {
if ctx.Err() != nil {
return
}

p := hc.pool
p.mu.Lock()
// Check if the number of open sessions has increased. If it has, we
// should stop deleting sessions, as the load has increased and
// additional sessions are needed.
if p.numOpened >= prevNumOpened {
break
}
prevNumOpened = p.numOpened

if sessionsToKeep >= p.numOpened {
// Check on both whether we have reached the number of open sessions as
// well as the number of sessions to delete, in case sessions have been
// deleted by other methods because they have expired or deemed
// invalid.
if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete {
p.mu.Unlock()
break
}
Expand All @@ -1235,6 +1355,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, sessionsToKeep uint64)
}
p.mu.Unlock()
if s != nil {
deleted++
// destroy session as expire.
s.destroy(true)
} else {
Expand Down

0 comments on commit 8097922

Please sign in to comment.