Skip to content

Commit

Permalink
Implement per-table auto increment locking.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktobey committed Apr 5, 2024
1 parent 1685140 commit fc32126
Showing 1 changed file with 42 additions and 39 deletions.
81 changes: 42 additions & 39 deletions go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package dsess

import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess/mutexmap"
"github.com/dolthub/go-mysql-server/sql"
gmstypes "github.com/dolthub/go-mysql-server/sql/types"
"io"
"math"
"strings"
"sync"

"github.com/dolthub/go-mysql-server/sql"
gmstypes "github.com/dolthub/go-mysql-server/sql/types"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
Expand All @@ -35,8 +35,8 @@ import (

type AutoIncrementTracker struct {
dbName string
sequences map[string]uint64
mu *sync.Mutex
sequences *sync.Map // map[string]uint64
mm *mutexmap.MutexMap
}

var _ globalstate.AutoIncrementTracker = AutoIncrementTracker{}
Expand All @@ -48,8 +48,8 @@ var _ globalstate.AutoIncrementTracker = AutoIncrementTracker{}
func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (AutoIncrementTracker, error) {
ait := AutoIncrementTracker{
dbName: dbName,
sequences: make(map[string]uint64),
mu: &sync.Mutex{},
sequences: &sync.Map{},
mm: mutexmap.NewMutexMap(),
}

for _, root := range roots {
Expand All @@ -71,8 +71,9 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb
return true, err
}

if seq > ait.sequences[tableName] {
ait.sequences[tableName] = seq
oldValue, loaded := ait.sequences.LoadOrStore(tableName, seq)
if loaded && seq > oldValue.(uint64) {
ait.sequences.Store(tableName, seq)
}

return false, nil
Expand All @@ -82,41 +83,47 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb
return AutoIncrementTracker{}, err
}
}

return ait, nil
}

func loadAutoIncValue(sequences *sync.Map, tableName string) uint64 {
tableName = strings.ToLower(tableName)
current, hasCurrent := sequences.Load(tableName)
if !hasCurrent {
return 0
}
return current.(uint64)
}

// Current returns the next value to be generated in the auto increment sequence for the table named
func (a AutoIncrementTracker) Current(tableName string) uint64 {
a.mu.Lock()
defer a.mu.Unlock()
return a.sequences[strings.ToLower(tableName)]
tableName = strings.ToLower(tableName)
return loadAutoIncValue(a.sequences, tableName)
}

// Next returns the next auto increment value for the table named using the provided value from an insert (which may
// be null or 0, in which case it will be generated from the sequence).
func (a AutoIncrementTracker) Next(tbl string, insertVal interface{}) (uint64, error) {
a.mu.Lock()
defer a.mu.Unlock()

tbl = strings.ToLower(tbl)

given, err := CoerceAutoIncrementValue(insertVal)
if err != nil {
return 0, err
}

curr := a.sequences[tbl]
unlocker := a.mm.Lock(tbl)
defer unlocker.Unlock()

curr := loadAutoIncValue(a.sequences, tbl)

if given == 0 {
// |given| is 0 or NULL
a.sequences[tbl]++
a.sequences.Store(tbl, curr+1)
return curr, nil
}

if given >= curr {
a.sequences[tbl] = given
a.sequences[tbl]++
a.sequences.Store(tbl, given+1)
return given, nil
}

Expand Down Expand Up @@ -152,14 +159,14 @@ func CoerceAutoIncrementValue(val interface{}) (uint64, error) {
// table. Otherwise, the update is silently disregarded. So far this matches the MySQL behavior, but Dolt uses the
// maximum value for this table across all branches.
func (a AutoIncrementTracker) Set(ctx *sql.Context, tableName string, table *doltdb.Table, ws ref.WorkingSetRef, newAutoIncVal uint64) (*doltdb.Table, error) {
a.mu.Lock()
defer a.mu.Unlock()

tableName = strings.ToLower(tableName)

existing := a.sequences[tableName]
unlocker := a.mm.Lock(tableName)
defer unlocker.Unlock()

existing := loadAutoIncValue(a.sequences, tableName)
if newAutoIncVal > existing {
a.sequences[strings.ToLower(tableName)] = newAutoIncVal
a.sequences.Store(tableName, newAutoIncVal)
return table.SetAutoIncrementValue(ctx, newAutoIncVal)
} else {
// If the value is not greater than the current tracker, we have more work to do
Expand Down Expand Up @@ -310,7 +317,7 @@ func (a AutoIncrementTracker) deepSet(ctx *sql.Context, tableName string, table
}
}

a.sequences[tableName] = maxAutoInc
a.sequences.Store(tableName, maxAutoInc)
return table, nil
}

Expand Down Expand Up @@ -351,27 +358,21 @@ func getMaxIndexValue(ctx context.Context, indexData durable.Index) (uint64, err

// AddNewTable initializes a new table with an auto increment column to the tracker, as necessary
func (a AutoIncrementTracker) AddNewTable(tableName string) {
a.mu.Lock()
defer a.mu.Unlock()

tableName = strings.ToLower(tableName)
// only initialize the sequence for this table if no other branch has such a table
if _, ok := a.sequences[tableName]; !ok {
a.sequences[tableName] = uint64(1)
}
a.sequences.LoadOrStore(tableName, uint64(1))
}

// DropTable drops the table with the name given.
// To establish the new auto increment value, callers must also pass all other working sets in scope that may include
// a table with the same name, omitting the working set that just deleted the table named.
func (a AutoIncrementTracker) DropTable(ctx *sql.Context, tableName string, wses ...*doltdb.WorkingSet) error {
a.mu.Lock()
defer a.mu.Unlock()

tableName = strings.ToLower(tableName)

// reset sequence to the minimum value
a.sequences[tableName] = 1
unlocker := a.mm.Lock(tableName)
defer unlocker.Unlock()

newHighestValue := uint64(1)

// Get the new highest value from all tables in the working sets given
for _, ws := range wses {
Expand All @@ -395,11 +396,13 @@ func (a AutoIncrementTracker) DropTable(ctx *sql.Context, tableName string, wses
return err
}

if seq > a.sequences[tableName] {
a.sequences[tableName] = seq
if seq > newHighestValue {
newHighestValue = seq
}
}
}

a.sequences.Store(tableName, newHighestValue)

return nil
}

0 comments on commit fc32126

Please sign in to comment.