diff --git a/dmap_atomic.go b/dmap_atomic.go index 6a19865b..e417a435 100644 --- a/dmap_atomic.go +++ b/dmap_atomic.go @@ -23,14 +23,12 @@ import ( ) func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error) { - err := db.lockWithTimeout(w.dmap, w.key, time.Minute) - if err != nil { - return 0, err - } + atomicKey := w.dmap + w.key + db.locker.Lock(atomicKey) defer func() { - err = db.unlock(w.dmap, w.key) + err := db.locker.Unlock(atomicKey) if err != nil { - db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s: %v", w.key, err) + db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err) } }() @@ -98,14 +96,12 @@ func (dm *DMap) Decr(key string, delta int) (int, error) { } func (db *Olric) getPut(w *writeop) ([]byte, error) { - err := db.lockWithTimeout(w.dmap, w.key, time.Minute) - if err != nil { - return nil, err - } + atomicKey := w.dmap + w.key + db.locker.Lock(atomicKey) defer func() { - err = db.unlock(w.dmap, w.key) + err := db.locker.Unlock(atomicKey) if err != nil { - db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s: %v", w.key, err) + db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err) } }() diff --git a/dmap_put.go b/dmap_put.go index 17888067..82bc5aee 100644 --- a/dmap_put.go +++ b/dmap_put.go @@ -27,6 +27,11 @@ import ( "github.com/buraksezer/olric/internal/storage" ) +const ( + IfNotExist = 1 << iota + IfExist +) + var ErrWriteQuorum = errors.New("write quorum cannot be reached") // writeop contains various values whose participate a write operation. @@ -36,7 +41,7 @@ type writeop struct { value []byte timestamp int64 timeout time.Duration - condition PutCondition + flags int } // fromReq generates a new protocol message from writeop instance. @@ -157,12 +162,12 @@ func (db *Olric) callPutOnCluster(hkey uint64, w *writeop) error { defer dm.Unlock() // Only set the key if it does not already exist. - if w.condition.IfNotExist && dm.storage.Check(hkey) { + if w.flags&IfNotExist != 0 && dm.storage.Check(hkey) { return nil } // Only set the key if it already exist. - if w.condition.IfExist && !dm.storage.Check(hkey) { + if w.flags&IfExist != 0 && !dm.storage.Check(hkey) { return nil } @@ -211,21 +216,29 @@ func (db *Olric) put(opcode protocol.OpCode, w *writeop) error { return err } -// PutEx sets the value for the given key with TTL. It overwrites any previous -// value for that key. It's thread-safe. The key has to be string. Value type -// is arbitrary. It is safe to modify the contents of the arguments after -// Put returns but not before. -func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) error { +func (dm *DMap) prepareWriteop(key string, value interface{}, timeout time.Duration, flags int) (*writeop, error) { val, err := dm.db.serializer.Marshal(value) if err != nil { - return err + return nil, err } - w := &writeop{ + return &writeop{ dmap: dm.name, key: key, value: val, timestamp: time.Now().UnixNano(), timeout: timeout, + flags: flags, + }, nil +} + +// PutEx sets the value for the given key with TTL. It overwrites any previous +// value for that key. It's thread-safe. The key has to be string. Value type +// is arbitrary. It is safe to modify the contents of the arguments after +// Put returns but not before. +func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) error { + w, err := dm.prepareWriteop(key, value, timeout, 0) + if err != nil { + return err } return dm.db.put(protocol.OpPutEx, w) } @@ -235,42 +248,33 @@ func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) erro // is arbitrary. It is safe to modify the contents of the arguments after // Put returns but not before. func (dm *DMap) Put(key string, value interface{}) error { - return dm.PutEx(key, value, nilTimeout) + w, err := dm.prepareWriteop(key, value, nilTimeout, 0) + if err != nil { + return err + } + return dm.db.put(protocol.OpPut, w) } -type PutCondition struct { - IfNotExist bool - IfExist bool - TTL time.Duration +func (dm *DMap) PutIf(key string, value interface{}, flags int) error { + w, err := dm.prepareWriteop(key, value, nilTimeout, flags) + if err != nil { + return err + } + return dm.db.put(protocol.OpPut, w) } -func (dm *DMap) PutIf(key string, value interface{}, condition PutCondition) error { - val, err := dm.db.serializer.Marshal(value) +func (dm *DMap) PutIfEx(key string, value interface{}, timeout time.Duration, flags int) error { + w, err := dm.prepareWriteop(key, value, timeout, flags) if err != nil { return err } - w := &writeop{ - dmap: dm.name, - key: key, - value: val, - timestamp: time.Now().UnixNano(), - condition: condition, - } return dm.db.put(protocol.OpPut, w) } func (db *Olric) exPutOperation(req *protocol.Message) *protocol.Message { w := &writeop{} w.fromReq(req) - err := db.put(protocol.OpPut, w) - return db.prepareResponse(req, err) -} - -func (db *Olric) exPutExOperation(req *protocol.Message) *protocol.Message { - w := &writeop{} - w.fromReq(req) - err := db.put(protocol.OpPutEx, w) - return db.prepareResponse(req, err) + return db.prepareResponse(req, db.put(req.Op, w)) } func (db *Olric) putReplicaOperation(req *protocol.Message) *protocol.Message { diff --git a/dmap_put_test.go b/dmap_put_test.go index 399e6702..2536022b 100644 --- a/dmap_put_test.go +++ b/dmap_put_test.go @@ -212,17 +212,15 @@ func TestDMap_PutIfNotExist(t *testing.T) { t.Fatalf("Expected nil. Got: %v", err) } - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { err = dm.Put(bkey(i), bval(i)) if err != nil { t.Fatalf("Expected nil. Got: %v", err) } } - c := PutCondition{ - IfNotExist: true, - } + for i := 0; i < 10; i++ { - err = dm.PutIf(bkey(i), bval(i*2), c) + err = dm.PutIf(bkey(i), bval(i*2), IfNotExist) if err != nil { t.Fatalf("Expected nil. Got: %v", err) } @@ -255,11 +253,8 @@ func TestDMap_PutIfExist(t *testing.T) { if err != nil { t.Fatalf("Expected nil. Got: %v", err) } - c := PutCondition{ - IfExist: true, - } for i := 0; i < 10; i++ { - err = dm.PutIf(bkey(i), bval(i*2), c) + err = dm.PutIf(bkey(i), bval(i*2), IfExist) if err != nil { t.Fatalf("Expected nil. Got: %v", err) } @@ -272,4 +267,3 @@ func TestDMap_PutIfExist(t *testing.T) { } } } - diff --git a/internal/locker/locker.go b/internal/locker/locker.go new file mode 100644 index 00000000..0b22ddfa --- /dev/null +++ b/internal/locker/locker.go @@ -0,0 +1,112 @@ +/* +Package locker provides a mechanism for creating finer-grained locking to help +free up more global locks to handle other tasks. + +The implementation looks close to a sync.Mutex, however the user must provide a +reference to use to refer to the underlying lock when locking and unlocking, +and unlock may generate an error. + +If a lock with a given name does not exist when `Lock` is called, one is +created. +Lock references are automatically cleaned up on `Unlock` if nothing else is +waiting for the lock. +*/ +package locker + +import ( + "errors" + "sync" + "sync/atomic" +) + +// ErrNoSuchLock is returned when the requested lock does not exist +var ErrNoSuchLock = errors.New("no such lock") + +// Locker provides a locking mechanism based on the passed in reference name +type Locker struct { + mu sync.Mutex + locks map[string]*lockCtr +} + +// lockCtr is used by Locker to represent a lock with a given name. +type lockCtr struct { + mu sync.Mutex + // waiters is the number of waiters waiting to acquire the lock + // this is int32 instead of uint32 so we can add `-1` in `dec()` + waiters int32 +} + +// inc increments the number of waiters waiting for the lock +func (l *lockCtr) inc() { + atomic.AddInt32(&l.waiters, 1) +} + +// dec decrements the number of waiters waiting on the lock +func (l *lockCtr) dec() { + atomic.AddInt32(&l.waiters, -1) +} + +// count gets the current number of waiters +func (l *lockCtr) count() int32 { + return atomic.LoadInt32(&l.waiters) +} + +// Lock locks the mutex +func (l *lockCtr) Lock() { + l.mu.Lock() +} + +// Unlock unlocks the mutex +func (l *lockCtr) Unlock() { + l.mu.Unlock() +} + +// New creates a new Locker +func New() *Locker { + return &Locker{ + locks: make(map[string]*lockCtr), + } +} + +// Lock locks a mutex with the given name. If it doesn't exist, one is created +func (l *Locker) Lock(name string) { + l.mu.Lock() + if l.locks == nil { + l.locks = make(map[string]*lockCtr) + } + + nameLock, exists := l.locks[name] + if !exists { + nameLock = &lockCtr{} + l.locks[name] = nameLock + } + + // increment the nameLock waiters while inside the main mutex + // this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently + nameLock.inc() + l.mu.Unlock() + + // Lock the nameLock outside the main mutex so we don't block other operations + // once locked then we can decrement the number of waiters for this lock + nameLock.Lock() + nameLock.dec() +} + +// Unlock unlocks the mutex with the given name +// If the given lock is not being waited on by any other callers, it is deleted +func (l *Locker) Unlock(name string) error { + l.mu.Lock() + nameLock, exists := l.locks[name] + if !exists { + l.mu.Unlock() + return ErrNoSuchLock + } + + if nameLock.count() == 0 { + delete(l.locks, name) + } + nameLock.Unlock() + + l.mu.Unlock() + return nil +} diff --git a/internal/locker/locker_test.go b/internal/locker/locker_test.go new file mode 100644 index 00000000..39633c93 --- /dev/null +++ b/internal/locker/locker_test.go @@ -0,0 +1,161 @@ +package locker + +import ( + "math/rand" + "strconv" + "sync" + "testing" + "time" +) + +func TestLockCounter(t *testing.T) { + l := &lockCtr{} + l.inc() + + if l.waiters != 1 { + t.Fatal("counter inc failed") + } + + l.dec() + if l.waiters != 0 { + t.Fatal("counter dec failed") + } +} + +func TestLockerLock(t *testing.T) { + l := New() + l.Lock("test") + ctr := l.locks["test"] + + if ctr.count() != 0 { + t.Fatalf("expected waiters to be 0, got :%d", ctr.waiters) + } + + chDone := make(chan struct{}) + go func() { + l.Lock("test") + close(chDone) + }() + + chWaiting := make(chan struct{}) + go func() { + for range time.Tick(1 * time.Millisecond) { + if ctr.count() == 1 { + close(chWaiting) + break + } + } + }() + + select { + case <-chWaiting: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for lock waiters to be incremented") + } + + select { + case <-chDone: + t.Fatal("lock should not have returned while it was still held") + default: + } + + if err := l.Unlock("test"); err != nil { + t.Fatal(err) + } + + select { + case <-chDone: + case <-time.After(3 * time.Second): + t.Fatalf("lock should have completed") + } + + if ctr.count() != 0 { + t.Fatalf("expected waiters to be 0, got: %d", ctr.count()) + } +} + +func TestLockerUnlock(t *testing.T) { + l := New() + + l.Lock("test") + l.Unlock("test") + + chDone := make(chan struct{}) + go func() { + l.Lock("test") + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(3 * time.Second): + t.Fatalf("lock should not be blocked") + } +} + +func TestLockerConcurrency(t *testing.T) { + l := New() + + var wg sync.WaitGroup + for i := 0; i <= 10000; i++ { + wg.Add(1) + go func() { + l.Lock("test") + // if there is a concurrency issue, will very likely panic here + l.Unlock("test") + wg.Done() + }() + } + + chDone := make(chan struct{}) + go func() { + wg.Wait() + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for locks to complete") + } + + // Since everything has unlocked this should not exist anymore + if ctr, exists := l.locks["test"]; exists { + t.Fatalf("lock should not exist: %v", ctr) + } +} + +func BenchmarkLocker(b *testing.B) { + l := New() + for i := 0; i < b.N; i++ { + l.Lock("test") + l.Unlock("test") + } +} + +func BenchmarkLockerParallel(b *testing.B) { + l := New() + b.SetParallelism(128) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + l.Lock("test") + l.Unlock("test") + } + }) +} + +func BenchmarkLockerMoreKeys(b *testing.B) { + l := New() + var keys []string + for i := 0; i < 64; i++ { + keys = append(keys, strconv.Itoa(i)) + } + b.SetParallelism(128) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + k := keys[rand.Intn(len(keys))] + l.Lock(k) + l.Unlock(k) + } + }) +} diff --git a/olric.go b/olric.go index 03d4eb42..8bb9b297 100644 --- a/olric.go +++ b/olric.go @@ -33,6 +33,7 @@ import ( "github.com/buraksezer/olric/hasher" "github.com/buraksezer/olric/internal/discovery" "github.com/buraksezer/olric/internal/flog" + locker2 "github.com/buraksezer/olric/internal/locker" "github.com/buraksezer/olric/internal/protocol" "github.com/buraksezer/olric/internal/storage" "github.com/buraksezer/olric/internal/transport" @@ -62,6 +63,7 @@ type Olric struct { config *config.Config log *flog.Logger hasher hasher.Hasher + locker *locker2.Locker // TODO: rename locker2 to locker serializer serializer.Serializer discovery *discovery.Discovery consistent *consistent.Consistent @@ -191,6 +193,7 @@ func New(c *config.Config) (*Olric, error) { log: flogger, config: c, hasher: c.Hasher, + locker: locker2.New(), serializer: c.Serializer, consistent: consistent.New(nil, cfg), client: client, @@ -327,7 +330,7 @@ func (db *Olric) Start() error { func (db *Olric) registerOperations() { // Put db.server.RegisterOperation(protocol.OpPut, db.exPutOperation) - db.server.RegisterOperation(protocol.OpPutEx, db.exPutExOperation) + db.server.RegisterOperation(protocol.OpPutEx, db.exPutOperation) db.server.RegisterOperation(protocol.OpPutReplica, db.putReplicaOperation) db.server.RegisterOperation(protocol.OpPutExReplica, db.putReplicaOperation)