Skip to content

Commit

Permalink
sql: clean up txn fingerprint ID cache a bit
Browse files Browse the repository at this point in the history
This commit cleans up the txn fingerprint ID cache in the following
manner:
- it properly performs the memory accounting for each id. Previously, we
didn't connect the account to actual monitor (it was uninitialized) and
we only grew by 1 byte (which is significant underestimate). We now
remove unused monitor and create an account bound to the session
monitor.
- we now only store the id in the "key" part of the cache entry
(previously we stored it also in "value" which was redundant).

The main motivation for this change was reducing the memory leak that is
(hopefully) fixed in the following commit.

Release note: None
  • Loading branch information
yuzefovich committed Mar 7, 2024
1 parent f4dc2b5 commit 88ebd70
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 35 deletions.
6 changes: 5 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ func (s *Server) newConnExecutor(
memMetrics.TxnMaxBytesHist,
-1 /* increment */, noteworthyMemoryUsageBytes, s.cfg.Settings,
)
txnFingerprintIDCacheAcc := sessionMon.MakeBoundAccount()

nodeIDOrZero, _ := s.cfg.NodeInfo.NodeID.OptionalNodeID()
ex := &connExecutor{
Expand Down Expand Up @@ -1100,7 +1101,8 @@ func (s *Server) newConnExecutor(
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
totalActiveTimeStopWatch: timeutil.NewStopWatch(),
txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings, sessionRootMon),
txnFingerprintIDCache: NewTxnFingerprintIDCache(ctx, s.cfg.Settings, &txnFingerprintIDCacheAcc),
txnFingerprintIDAcc: &txnFingerprintIDCacheAcc,
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -1308,6 +1310,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.mu.IdleInSessionTimeout.Stop()
ex.mu.IdleInTransactionSessionTimeout.Stop()

ex.txnFingerprintIDAcc.Close(ctx)
if closeType != panicClose {
ex.state.mon.Stop(ctx)
ex.sessionPreparedMon.Stop(ctx)
Expand Down Expand Up @@ -1699,6 +1702,7 @@ type connExecutor struct {
// txnFingerprintIDCache is used to track the most recent
// txnFingerprintIDs executed in this session.
txnFingerprintIDCache *TxnFingerprintIDCache
txnFingerprintIDAcc *mon.BoundAccount

// totalActiveTimeStopWatch tracks the total active time of the session.
// This is defined as the time spent executing transactions and statements.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3167,7 +3167,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
transactionFingerprintID :=
appstatspb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum())

err := ex.txnFingerprintIDCache.Add(transactionFingerprintID)
err := ex.txnFingerprintIDCache.Add(ctx, transactionFingerprintID)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err)
Expand Down
39 changes: 20 additions & 19 deletions pkg/sql/txn_fingerprint_id_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sql

import (
"context"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -42,16 +43,19 @@ type TxnFingerprintIDCache struct {
acc *mon.BoundAccount
cache *cache.UnorderedCache
}

mon *mon.BytesMonitor
}

const (
cacheEntrySize = int64(unsafe.Sizeof(cache.Entry{}))
txnFingerprintIDSize = int64(unsafe.Sizeof(appstatspb.TransactionFingerprintID(0)))
)

// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache.
func NewTxnFingerprintIDCache(
st *cluster.Settings, parentMon *mon.BytesMonitor,
ctx context.Context, st *cluster.Settings, acc *mon.BoundAccount,
) *TxnFingerprintIDCache {
b := &TxnFingerprintIDCache{st: st}

b.mu.acc = acc
b.mu.cache = cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, _, _ interface{}) bool {
Expand All @@ -63,29 +67,26 @@ func NewTxnFingerprintIDCache(
return int64(size) > capacity
},
OnEvictedEntry: func(entry *cache.Entry) {
b.mu.acc.Shrink(context.Background(), 1)
// We must be holding the mutex already because this callback is
// executed during Cache.Add which we surround with the lock.
b.mu.AssertHeld()
b.mu.acc.Shrink(ctx, cacheEntrySize+txnFingerprintIDSize)
},
})

monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-cache", 0 /* limit */, parentMon)
b.mon = monitor
b.mon.StartNoReserved(context.Background(), parentMon)

return b
}

// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's capacity
// if necessary.
func (b *TxnFingerprintIDCache) Add(value appstatspb.TransactionFingerprintID) error {
// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's
// capacity if necessary.
func (b *TxnFingerprintIDCache) Add(
ctx context.Context, id appstatspb.TransactionFingerprintID,
) error {
b.mu.Lock()
defer b.mu.Unlock()

if err := b.mu.acc.Grow(context.Background(), 1); err != nil {
if err := b.mu.acc.Grow(ctx, cacheEntrySize+txnFingerprintIDSize); err != nil {
return err
}

b.mu.cache.Add(value, value)

b.mu.cache.Add(id, nil /* value */)
return nil
}

Expand All @@ -105,7 +106,7 @@ func (b *TxnFingerprintIDCache) GetAllTxnFingerprintIDs() []appstatspb.Transacti
txnFingerprintIDsRemoved := make([]appstatspb.TransactionFingerprintID, 0)

b.mu.cache.Do(func(entry *cache.Entry) {
id := entry.Value.(appstatspb.TransactionFingerprintID)
id := entry.Key.(appstatspb.TransactionFingerprintID)

if int64(len(txnFingerprintIDs)) == size {
txnFingerprintIDsRemoved = append(txnFingerprintIDsRemoved, id)
Expand Down
17 changes: 3 additions & 14 deletions pkg/sql/txn_fingerprint_id_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package sql
import (
"context"
"fmt"
"math"
"sort"
"strconv"
"testing"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
Expand All @@ -46,17 +44,8 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
var capacity int
d.ScanArgs(t, "capacity", &capacity)

st := &cluster.Settings{}
monitor := mon.NewUnlimitedMonitor(
ctx,
"test",
mon.MemoryResource,
nil, /* currCount */
nil, /* maxHist */
math.MaxInt64,
st,
)
txnFingerprintIDCache = NewTxnFingerprintIDCache(st, monitor)
st := cluster.MakeTestingClusterSettings()
txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, nil /* acc */)

TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity))

Expand All @@ -77,7 +66,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) {
require.NoError(t, err)
txnFingerprintID := appstatspb.TransactionFingerprintID(id)

err = txnFingerprintIDCache.Add(txnFingerprintID)
err = txnFingerprintIDCache.Add(ctx, txnFingerprintID)
require.NoError(t, err)

return fmt.Sprintf("size: %d", txnFingerprintIDCache.size())
Expand Down

0 comments on commit 88ebd70

Please sign in to comment.