Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: sql: fix leak in memory accounting around TxnFingerprintIDCache #121873

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 1 addition & 5 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,6 @@ func (s *Server) newConnExecutor(
MaxHist: memMetrics.TxnMaxBytesHist,
Settings: s.cfg.Settings,
})
txnFingerprintIDCacheAcc := sessionMon.MakeBoundAccount()

nodeIDOrZero, _ := s.cfg.NodeInfo.NodeID.OptionalNodeID()
ex := &connExecutor{
Expand Down Expand Up @@ -1100,8 +1099,7 @@ func (s *Server) newConnExecutor(
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
totalActiveTimeStopWatch: timeutil.NewStopWatch(),
txnFingerprintIDCache: NewTxnFingerprintIDCache(ctx, s.cfg.Settings, &txnFingerprintIDCacheAcc),
txnFingerprintIDAcc: &txnFingerprintIDCacheAcc,
txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings),
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -1330,7 +1328,6 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.mu.IdleInSessionTimeout.Stop()
ex.mu.IdleInTransactionSessionTimeout.Stop()

ex.txnFingerprintIDAcc.Close(ctx)
if closeType != panicClose {
ex.state.mon.Stop(ctx)
ex.sessionPreparedMon.Stop(ctx)
Expand Down Expand Up @@ -1746,7 +1743,6 @@ type connExecutor struct {
// txnFingerprintIDCache is used to track the most recent
// txnFingerprintIDs executed in this session.
txnFingerprintIDCache *TxnFingerprintIDCache
txnFingerprintIDAcc *mon.BoundAccount

// totalActiveTimeStopWatch tracks the total active time of the session.
// This is defined as the time spent executing transactions and statements.
Expand Down
10 changes: 2 additions & 8 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3213,13 +3213,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now())
transactionFingerprintID :=
appstatspb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum())

err := ex.txnFingerprintIDCache.Add(ctx, transactionFingerprintID)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err)
}
}
ex.txnFingerprintIDCache.Add(transactionFingerprintID)

ex.statsCollector.EndTransaction(
ctx,
Expand All @@ -3235,7 +3229,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
)
}

err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr)
err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down
31 changes: 4 additions & 27 deletions pkg/sql/txn_fingerprint_id_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@
package sql

import (
"context"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand All @@ -40,22 +36,13 @@ type TxnFingerprintIDCache struct {

mu struct {
syncutil.RWMutex
acc *mon.BoundAccount
cache *cache.UnorderedCache
}
}

const (
cacheEntrySize = int64(unsafe.Sizeof(cache.Entry{}))
txnFingerprintIDSize = int64(unsafe.Sizeof(appstatspb.TransactionFingerprintID(0)))
)

// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache.
func NewTxnFingerprintIDCache(
ctx context.Context, st *cluster.Settings, acc *mon.BoundAccount,
) *TxnFingerprintIDCache {
func NewTxnFingerprintIDCache(st *cluster.Settings) *TxnFingerprintIDCache {
b := &TxnFingerprintIDCache{st: st}
b.mu.acc = acc
b.mu.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
Expand All @@ -66,28 +53,18 @@ func NewTxnFingerprintIDCache(
capacity := TxnFingerprintIDCacheCapacity.Get(&st.SV)
return int64(size) > capacity
},
OnEvictedEntry: func(entry *cache.Entry) {
// We must be holding the mutex already because this callback is
// executed during Cache.Add which we surround with the lock.
b.mu.AssertHeld()
b.mu.acc.Shrink(ctx, cacheEntrySize+txnFingerprintIDSize)
},
})
return b
}

// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's
// capacity if necessary.
func (b *TxnFingerprintIDCache) Add(
ctx context.Context, id appstatspb.TransactionFingerprintID,
) error {
func (b *TxnFingerprintIDCache) Add(id appstatspb.TransactionFingerprintID) {
b.mu.Lock()
defer b.mu.Unlock()
if err := b.mu.acc.Grow(ctx, cacheEntrySize+txnFingerprintIDSize); err != nil {
return err
}
// TODO(yuzefovich): we should perform memory accounting for this. Note that
// it can be quite tricky to get right - see #121844.
b.mu.cache.Add(id, nil /* value */)
return nil
}

// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the cache.
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/txn_fingerprint_id_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
d.ScanArgs(t, "capacity", &capacity)

st := cluster.MakeTestingClusterSettings()
txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, nil /* acc */)
txnFingerprintIDCache = NewTxnFingerprintIDCache(st)

TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity))

Expand All @@ -66,9 +66,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
require.NoError(t, err)
txnFingerprintID := appstatspb.TransactionFingerprintID(id)

err = txnFingerprintIDCache.Add(ctx, txnFingerprintID)
require.NoError(t, err)

txnFingerprintIDCache.Add(txnFingerprintID)
return fmt.Sprintf("size: %d", txnFingerprintIDCache.size())

case "show":
Expand Down