Skip to content

Connection pool specific backports for v19 #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release-19.0-github
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions go/pools/smartconnpool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package smartconnpool
import (
"context"
"sync/atomic"
"time"
)

type Connection interface {
Expand All @@ -33,8 +32,8 @@ type Connection interface {

type Pooled[C Connection] struct {
next atomic.Pointer[Pooled[C]]
timeCreated time.Time
timeUsed time.Time
timeCreated timestamp
timeUsed timestamp
pool *ConnPool[C]

Conn C
Expand Down
103 changes: 68 additions & 35 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package smartconnpool

import (
"context"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -134,7 +133,6 @@ type ConnPool[C Connection] struct {
connect Connector[C]
// refresh is the callback to check whether the pool needs to be refreshed
refresh RefreshCheck

// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
Expand Down Expand Up @@ -381,19 +379,23 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {

if conn == nil {
var err error
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
conn, err = pool.connNew(context.Background())
if err != nil {
pool.closedConn()
return
}
} else {
conn.timeUsed = time.Now()
conn.timeUsed.update()

lifetime := pool.extendedMaxLifetime()
if lifetime > 0 && time.Until(conn.timeCreated.Add(lifetime)) < 0 {
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
pool.Metrics.maxLifetimeClosed.Add(1)
conn.Close()
if err := pool.connReopen(context.Background(), conn, conn.timeUsed); err != nil {
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
pool.closedConn()
return
}
Expand All @@ -418,12 +420,30 @@ func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
return false
}

func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] {
// retry-loop: pop a connection from the stack and atomically check whether
// its timeout has elapsed. If the timeout has elapsed, the borrow will fail,
// which means that a background worker has already marked this connection
// as stale and is in the process of shutting it down. If we successfully mark
// the timeout as borrowed, we know that background workers will not be able
// to expire this connection (even if it's still visible to them), so it's
// safe to return it
for conn, ok := stack.Pop(); ok; conn, ok = stack.Pop() {
if conn.timeUsed.borrow() {
return conn
}
}
return nil
}

func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn, ok := pool.clean.Pop(); ok {
if conn := pool.pop(&pool.clean); conn != nil {
conn.timeUsed.update()
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn, ok := pool.settings[u].Pop(); ok {
if conn := pool.pop(&pool.settings[u]); conn != nil {
conn.timeUsed.update()
return pool.tryReturnConn(conn)
}
}
Expand All @@ -439,15 +459,22 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
return time.Duration(maxLifetime) + time.Duration(extended)
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Time) error {
var err error
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) (err error) {
dbconn.Conn, err = pool.config.connect(ctx)
if err != nil {
return err
}

dbconn.timeUsed = now
dbconn.timeCreated = now
if setting := dbconn.Conn.Setting(); setting != nil {
err = dbconn.Conn.ApplySetting(ctx, setting)
if err != nil {
dbconn.Close()
return err
}
}

dbconn.timeCreated.set(now)
dbconn.timeUsed.set(now)
return nil
}

Expand All @@ -456,13 +483,14 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
if err != nil {
return nil, err
}
now := time.Now()
return &Pooled[C]{
timeCreated: now,
timeUsed: now,
pool: pool,
Conn: conn,
}, nil
pooled := &Pooled[C]{
pool: pool,
Conn: conn,
}
now := monotonicNow()
pooled.timeUsed.set(now)
pooled.timeCreated.set(now)
return pooled, nil
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
Expand All @@ -475,7 +503,7 @@ func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {

for i := uint32(0); i <= stackMask; i++ {
pos := (i + start) & stackMask
if conn, ok := pool.settings[pos].Pop(); ok {
if conn := pool.pop(&pool.settings[pos]); conn != nil {
return conn
}
}
Expand Down Expand Up @@ -509,7 +537,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
pool.Metrics.getCount.Add(1)

// best case: if there's a connection in the clean stack, return it right away
if conn, ok := pool.clean.Pop(); ok {
if conn := pool.pop(&pool.clean); conn != nil {
pool.borrowed.Add(1)
return conn, nil
}
Expand Down Expand Up @@ -545,7 +573,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
err = conn.Conn.ResetSetting(ctx)
if err != nil {
conn.Close()
err = pool.connReopen(ctx, conn, time.Now())
err = pool.connReopen(ctx, conn, monotonicNow())
if err != nil {
pool.closedConn()
return nil, err
Expand All @@ -563,10 +591,10 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (

var err error
// best case: check if there's a connection in the setting stack where our Setting belongs
conn, _ := pool.settings[setting.bucket&stackMask].Pop()
conn := pool.pop(&pool.settings[setting.bucket&stackMask])
// if there's connection with our setting, try popping a clean connection
if conn == nil {
conn, _ = pool.clean.Pop()
conn = pool.pop(&pool.clean)
}
// otherwise try opening a brand new connection and we'll apply the setting to it
if conn == nil {
Expand Down Expand Up @@ -605,7 +633,7 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
err = conn.Conn.ResetSetting(ctx)
if err != nil {
conn.Close()
err = pool.connReopen(ctx, conn, time.Now())
err = pool.connReopen(ctx, conn, monotonicNow())
if err != nil {
pool.closedConn()
return nil, err
Expand Down Expand Up @@ -667,7 +695,7 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn, _ = pool.clean.Pop()
conn = pool.pop(&pool.clean)
}
if conn == nil {
time.Sleep(delay)
Expand All @@ -689,21 +717,26 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
return
}

var conns []*Pooled[C]
mono := monotonicFromTime(now)

closeInStack := func(s *connStack[C]) {
conns = s.PopAll(conns[:0])
slices.Reverse(conns)

for _, conn := range conns {
if conn.timeUsed.Add(timeout).Sub(now) < 0 {
// Do a read-only best effort iteration of all the connection in this
// stack and atomically attempt to mark them as expired.
// Any connections that are marked as expired are _not_ removed from
// the stack; it's generally unsafe to remove nodes from the stack
// besides the head. When clients pop from the stack, they'll immediately
// notice the expired connection and ignore it.
// see: timestamp.expired
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
continue
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
pool.closedConn()
}
}

s.Push(conn)
}
}

Expand Down
Loading
Loading