Skip to content

Commit

Permalink
Merge pull request #121873 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-121847

release-24.1: sql: fix leak in memory accounting around TxnFingerprintIDCache (#121873)

Co-Authored-By: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
yuzefovich and yuzefovich committed Apr 8, 2024
2 parents 745b201 + 86d4d9f commit 660bfd9
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 44 deletions.
6 changes: 1 addition & 5 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,6 @@ func (s *Server) newConnExecutor(
MaxHist: memMetrics.TxnMaxBytesHist,
Settings: s.cfg.Settings,
})
txnFingerprintIDCacheAcc := sessionMon.MakeBoundAccount()

nodeIDOrZero, _ := s.cfg.NodeInfo.NodeID.OptionalNodeID()
ex := &connExecutor{
Expand Down Expand Up @@ -1100,8 +1099,7 @@ func (s *Server) newConnExecutor(
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
totalActiveTimeStopWatch: timeutil.NewStopWatch(),
txnFingerprintIDCache: NewTxnFingerprintIDCache(ctx, s.cfg.Settings, &txnFingerprintIDCacheAcc),
txnFingerprintIDAcc: &txnFingerprintIDCacheAcc,
txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings),
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -1330,7 +1328,6 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.mu.IdleInSessionTimeout.Stop()
ex.mu.IdleInTransactionSessionTimeout.Stop()

ex.txnFingerprintIDAcc.Close(ctx)
if closeType != panicClose {
ex.state.mon.Stop(ctx)
ex.sessionPreparedMon.Stop(ctx)
Expand Down Expand Up @@ -1746,7 +1743,6 @@ type connExecutor struct {
// txnFingerprintIDCache is used to track the most recent
// txnFingerprintIDs executed in this session.
txnFingerprintIDCache *TxnFingerprintIDCache
txnFingerprintIDAcc *mon.BoundAccount

// totalActiveTimeStopWatch tracks the total active time of the session.
// This is defined as the time spent executing transactions and statements.
Expand Down
10 changes: 2 additions & 8 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3213,13 +3213,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now())
transactionFingerprintID :=
appstatspb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum())

err := ex.txnFingerprintIDCache.Add(ctx, transactionFingerprintID)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err)
}
}
ex.txnFingerprintIDCache.Add(transactionFingerprintID)

ex.statsCollector.EndTransaction(
ctx,
Expand All @@ -3235,7 +3229,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
)
}

err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr)
err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down
31 changes: 4 additions & 27 deletions pkg/sql/txn_fingerprint_id_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@
package sql

import (
"context"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand All @@ -40,22 +36,13 @@ type TxnFingerprintIDCache struct {

mu struct {
syncutil.RWMutex
acc *mon.BoundAccount
cache *cache.UnorderedCache
}
}

const (
cacheEntrySize = int64(unsafe.Sizeof(cache.Entry{}))
txnFingerprintIDSize = int64(unsafe.Sizeof(appstatspb.TransactionFingerprintID(0)))
)

// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache.
func NewTxnFingerprintIDCache(
ctx context.Context, st *cluster.Settings, acc *mon.BoundAccount,
) *TxnFingerprintIDCache {
func NewTxnFingerprintIDCache(st *cluster.Settings) *TxnFingerprintIDCache {
b := &TxnFingerprintIDCache{st: st}
b.mu.acc = acc
b.mu.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
Expand All @@ -66,28 +53,18 @@ func NewTxnFingerprintIDCache(
capacity := TxnFingerprintIDCacheCapacity.Get(&st.SV)
return int64(size) > capacity
},
OnEvictedEntry: func(entry *cache.Entry) {
// We must be holding the mutex already because this callback is
// executed during Cache.Add which we surround with the lock.
b.mu.AssertHeld()
b.mu.acc.Shrink(ctx, cacheEntrySize+txnFingerprintIDSize)
},
})
return b
}

// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's
// capacity if necessary.
func (b *TxnFingerprintIDCache) Add(
ctx context.Context, id appstatspb.TransactionFingerprintID,
) error {
func (b *TxnFingerprintIDCache) Add(id appstatspb.TransactionFingerprintID) {
b.mu.Lock()
defer b.mu.Unlock()
if err := b.mu.acc.Grow(ctx, cacheEntrySize+txnFingerprintIDSize); err != nil {
return err
}
// TODO(yuzefovich): we should perform memory accounting for this. Note that
// it can be quite tricky to get right - see #121844.
b.mu.cache.Add(id, nil /* value */)
return nil
}

// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the cache.
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/txn_fingerprint_id_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
d.ScanArgs(t, "capacity", &capacity)

st := cluster.MakeTestingClusterSettings()
txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, nil /* acc */)
txnFingerprintIDCache = NewTxnFingerprintIDCache(st)

TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity))

Expand All @@ -66,9 +66,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
require.NoError(t, err)
txnFingerprintID := appstatspb.TransactionFingerprintID(id)

err = txnFingerprintIDCache.Add(ctx, txnFingerprintID)
require.NoError(t, err)

txnFingerprintIDCache.Add(txnFingerprintID)
return fmt.Sprintf("size: %d", txnFingerprintIDCache.size())

case "show":
Expand Down

0 comments on commit 660bfd9

Please sign in to comment.