Skip to content

Commit

Permalink
sessiondatapb: move SequenceCache to LocalOnlySessionData
Browse files Browse the repository at this point in the history
This actually affects the results of my session migration test, so I
think we actually need to include it.

Release note: None
  • Loading branch information
otan committed Aug 18, 2021
1 parent 5d2c91c commit 249b87e
Show file tree
Hide file tree
Showing 8 changed files with 538 additions and 128 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,7 @@ func (m *sessionDataMutator) SetNoticeDisplaySeverity(severity pgnotice.DisplayS

// initSequenceCache creates an empty sequence cache instance for the session.
func (m *sessionDataMutator) initSequenceCache() {
m.data.SequenceCache = sessiondata.SequenceCache{}
m.data.SequenceCache = sessiondatapb.SequenceCache{}
}

// SetIntervalStyle sets the IntervalStyle for the given session.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (p *planner) ExecCfg() *ExecutorConfig {
// GetOrInitSequenceCache returns the sequence cache for the session.
// If the sequence cache has not been used yet, it initializes the cache
// inside the session data.
func (p *planner) GetOrInitSequenceCache() sessiondata.SequenceCache {
func (p *planner) GetOrInitSequenceCache() sessiondatapb.SequenceCache {
if p.SessionData().SequenceCache == nil {
p.sessionDataMutator.initSequenceCache()
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sessiondata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go_library(
srcs = [
"internal.go",
"search_path.go",
"sequence_cache.go",
"sequence_state.go",
"session_data.go",
],
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/sessiondata/session_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ type LocalUnmigratableSessionData struct {
// descpb.ID -> descpb.ID, but cannot be stored as such due to package
// dependencies. Temporary tables are not supported in session migrations.
DatabaseIDToTempSchemaID map[uint32]uint32
// SequenceCache stores sequence values which have been cached using the
// CACHE sequence option.
// Cached sequence options are not yet supported during session migrations.
SequenceCache SequenceCache

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sessiondatapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
name = "sessiondatapb",
srcs = [
"local_only_session_data.go",
"sequence_cache.go",
"session_data.go",
],
embed = [":sessiondatapb_go_proto"],
Expand Down
600 changes: 505 additions & 95 deletions pkg/sql/sessiondatapb/local_only_session_data.pb.go

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,29 @@ message LocalOnlySessionData {
int64 serial_normalization_mode = 42 [(gogoproto.casttype)="SerialNormalizationMode"];
// NewSchemaChangerMode indicates whether to use the new schema changer.
int64 new_schema_changer_mode = 43 [(gogoproto.casttype)="NewSchemaChangerMode"];
// SequenceCache stores sequence values which have been cached using the
// CACHE sequence option.
map<uint32, sessiondatapb.SequenceCacheEntry> sequence_cache = 44 [(gogoproto.casttype)="SequenceCache"];

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
// be propagated to the remote nodes. If so, that parameter should live //
// in the SessionData message instead. //
///////////////////////////////////////////////////////////////////////////
}

// SequenceCacheEntry is an entry in a SequenceCache.
message SequenceCacheEntry {
// CachedVersion stores the descpb.DescriptorVersion that cached values are associated with.
// The version is checked to determine if cache needs to be invalidated. The version is stored as
// a uint32 to prevent an import cycle with the descpb package.
uint32 cached_version = 1;
// CurrentValue stores the present value of the sequence to be given out.
int64 current_value = 2;
// Increment stores the amount to Increment the currentVal by each time the
// currentVal is used. This value corresponds to descpb.TableDescriptor_SequenceOpts.Increment.
int64 increment = 3;
// NumValues represents the number of values to cache. The cache is considered
// to be empty when NumValues is 0.
int64 num_values = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sessiondata
package sessiondatapb

// SequenceCache stores sequence values that have already been created in KV
// and are available to be given out as sequence numbers. Values for sequences
Expand All @@ -20,22 +20,7 @@ package sessiondata
// new descriptor versions may not monotonically increase. For example, the sequence schema
// may be altered in a txn, so the cache sees a new version V and invalidates/repopulates itself. Then,
// the txn may get rolled back, so the cache will see version V-1 and invalidate/repopulate itself again.
type SequenceCache map[uint32]*sequenceCacheEntry

type sequenceCacheEntry struct {
// cachedVersion stores the descpb.DescriptorVersion that cached values are associated with.
// The version is checked to determine if cache needs to be invalidated. The version is stored as
// a uint32 to prevent an import cycle with the descpb package.
cachedVersion uint32
// currentValue stores the present value of the sequence to be given out.
currentValue int64
// increment stores the amount to increment the currentVal by each time the
// currentVal is used. This value corresponds to descpb.TableDescriptor_SequenceOpts.Increment.
increment int64
// numValues represents the number of values to cache. The cache is considered
// to be empty when numValues is 0.
numValues int64
}
type SequenceCache map[uint32]*SequenceCacheEntry

// NextValue fetches the next value in the sequence cache. If the values in the cache have all been
// given out or if the descriptor version has changed, then fetchNextValues() is used to repopulate the cache.
Expand All @@ -44,14 +29,14 @@ func (sc SequenceCache) NextValue(
) (int64, error) {
// Create entry for this sequence ID if there are no existing entries.
if _, found := sc[seqID]; !found {
sc[seqID] = &sequenceCacheEntry{}
sc[seqID] = &SequenceCacheEntry{}
}
cacheEntry := sc[seqID]

if cacheEntry.numValues > 0 && cacheEntry.cachedVersion == clientVersion {
cacheEntry.currentValue += cacheEntry.increment
cacheEntry.numValues--
return cacheEntry.currentValue - cacheEntry.increment, nil
if cacheEntry.NumValues > 0 && cacheEntry.CachedVersion == clientVersion {
cacheEntry.CurrentValue += cacheEntry.Increment
cacheEntry.NumValues--
return cacheEntry.CurrentValue - cacheEntry.Increment, nil
}

currentValue, increment, numValues, err := fetchNextValues()
Expand All @@ -61,9 +46,9 @@ func (sc SequenceCache) NextValue(

// One value must be returned, and the rest of the values are stored.
val := currentValue
cacheEntry.currentValue = currentValue + increment
cacheEntry.increment = increment
cacheEntry.numValues = numValues - 1
cacheEntry.cachedVersion = clientVersion
cacheEntry.CurrentValue = currentValue + increment
cacheEntry.Increment = increment
cacheEntry.NumValues = numValues - 1
cacheEntry.CachedVersion = clientVersion
return val, nil
}

0 comments on commit 249b87e

Please sign in to comment.