Skip to content

Commit

Permalink
Prepare for new DMap.Lock
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Oct 31, 2019
1 parent 4d79bbf commit 665be9f
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 56 deletions.
20 changes: 8 additions & 12 deletions dmap_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import (
)

func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error) {
err := db.lockWithTimeout(w.dmap, w.key, time.Minute)
if err != nil {
return 0, err
}
atomicKey := w.dmap + w.key
db.locker.Lock(atomicKey)
defer func() {
err = db.unlock(w.dmap, w.key)
err := db.locker.Unlock(atomicKey)
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s: %v", w.key, err)
db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err)
}
}()

Expand Down Expand Up @@ -98,14 +96,12 @@ func (dm *DMap) Decr(key string, delta int) (int, error) {
}

func (db *Olric) getPut(w *writeop) ([]byte, error) {
err := db.lockWithTimeout(w.dmap, w.key, time.Minute)
if err != nil {
return nil, err
}
atomicKey := w.dmap + w.key
db.locker.Lock(atomicKey)
defer func() {
err = db.unlock(w.dmap, w.key)
err := db.locker.Unlock(atomicKey)
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s: %v", w.key, err)
db.log.V(2).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err)
}
}()

Expand Down
70 changes: 37 additions & 33 deletions dmap_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"github.com/buraksezer/olric/internal/storage"
)

const (
IfNotExist = 1 << iota
IfExist
)

var ErrWriteQuorum = errors.New("write quorum cannot be reached")

// writeop contains various values whose participate a write operation.
Expand All @@ -36,7 +41,7 @@ type writeop struct {
value []byte
timestamp int64
timeout time.Duration
condition PutCondition
flags int
}

// fromReq generates a new protocol message from writeop instance.
Expand Down Expand Up @@ -157,12 +162,12 @@ func (db *Olric) callPutOnCluster(hkey uint64, w *writeop) error {
defer dm.Unlock()

// Only set the key if it does not already exist.
if w.condition.IfNotExist && dm.storage.Check(hkey) {
if w.flags&IfNotExist != 0 && dm.storage.Check(hkey) {
return nil
}

// Only set the key if it already exist.
if w.condition.IfExist && !dm.storage.Check(hkey) {
if w.flags&IfExist != 0 && !dm.storage.Check(hkey) {
return nil
}

Expand Down Expand Up @@ -211,21 +216,29 @@ func (db *Olric) put(opcode protocol.OpCode, w *writeop) error {
return err
}

// PutEx sets the value for the given key with TTL. It overwrites any previous
// value for that key. It's thread-safe. The key has to be string. Value type
// is arbitrary. It is safe to modify the contents of the arguments after
// Put returns but not before.
func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) error {
func (dm *DMap) prepareWriteop(key string, value interface{}, timeout time.Duration, flags int) (*writeop, error) {
val, err := dm.db.serializer.Marshal(value)
if err != nil {
return err
return nil, err
}
w := &writeop{
return &writeop{
dmap: dm.name,
key: key,
value: val,
timestamp: time.Now().UnixNano(),
timeout: timeout,
flags: flags,
}, nil
}

// PutEx sets the value for the given key with TTL. It overwrites any previous
// value for that key. It's thread-safe. The key has to be string. Value type
// is arbitrary. It is safe to modify the contents of the arguments after
// Put returns but not before.
func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) error {
w, err := dm.prepareWriteop(key, value, timeout, 0)
if err != nil {
return err
}
return dm.db.put(protocol.OpPutEx, w)
}
Expand All @@ -235,42 +248,33 @@ func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) erro
// is arbitrary. It is safe to modify the contents of the arguments after
// Put returns but not before.
func (dm *DMap) Put(key string, value interface{}) error {
return dm.PutEx(key, value, nilTimeout)
w, err := dm.prepareWriteop(key, value, nilTimeout, 0)
if err != nil {
return err
}
return dm.db.put(protocol.OpPut, w)
}

type PutCondition struct {
IfNotExist bool
IfExist bool
TTL time.Duration
func (dm *DMap) PutIf(key string, value interface{}, flags int) error {
w, err := dm.prepareWriteop(key, value, nilTimeout, flags)
if err != nil {
return err
}
return dm.db.put(protocol.OpPut, w)
}

func (dm *DMap) PutIf(key string, value interface{}, condition PutCondition) error {
val, err := dm.db.serializer.Marshal(value)
func (dm *DMap) PutIfEx(key string, value interface{}, timeout time.Duration, flags int) error {
w, err := dm.prepareWriteop(key, value, timeout, flags)
if err != nil {
return err
}
w := &writeop{
dmap: dm.name,
key: key,
value: val,
timestamp: time.Now().UnixNano(),
condition: condition,
}
return dm.db.put(protocol.OpPut, w)
}

func (db *Olric) exPutOperation(req *protocol.Message) *protocol.Message {
w := &writeop{}
w.fromReq(req)
err := db.put(protocol.OpPut, w)
return db.prepareResponse(req, err)
}

func (db *Olric) exPutExOperation(req *protocol.Message) *protocol.Message {
w := &writeop{}
w.fromReq(req)
err := db.put(protocol.OpPutEx, w)
return db.prepareResponse(req, err)
return db.prepareResponse(req, db.put(req.Op, w))
}

func (db *Olric) putReplicaOperation(req *protocol.Message) *protocol.Message {
Expand Down
14 changes: 4 additions & 10 deletions dmap_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,15 @@ func TestDMap_PutIfNotExist(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}

for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
err = dm.Put(bkey(i), bval(i))
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
}
c := PutCondition{
IfNotExist: true,
}

for i := 0; i < 10; i++ {
err = dm.PutIf(bkey(i), bval(i*2), c)
err = dm.PutIf(bkey(i), bval(i*2), IfNotExist)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -255,11 +253,8 @@ func TestDMap_PutIfExist(t *testing.T) {
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
c := PutCondition{
IfExist: true,
}
for i := 0; i < 10; i++ {
err = dm.PutIf(bkey(i), bval(i*2), c)
err = dm.PutIf(bkey(i), bval(i*2), IfExist)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand All @@ -272,4 +267,3 @@ func TestDMap_PutIfExist(t *testing.T) {
}
}
}

112 changes: 112 additions & 0 deletions internal/locker/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Package locker provides a mechanism for creating finer-grained locking to help
free up more global locks to handle other tasks.
The implementation looks close to a sync.Mutex, however the user must provide a
reference to use to refer to the underlying lock when locking and unlocking,
and unlock may generate an error.
If a lock with a given name does not exist when `Lock` is called, one is
created.
Lock references are automatically cleaned up on `Unlock` if nothing else is
waiting for the lock.
*/
package locker

import (
"errors"
"sync"
"sync/atomic"
)

// ErrNoSuchLock is returned when the requested lock does not exist
var ErrNoSuchLock = errors.New("no such lock")

// Locker provides a locking mechanism based on the passed in reference name
type Locker struct {
mu sync.Mutex
locks map[string]*lockCtr
}

// lockCtr is used by Locker to represent a lock with a given name.
type lockCtr struct {
mu sync.Mutex
// waiters is the number of waiters waiting to acquire the lock
// this is int32 instead of uint32 so we can add `-1` in `dec()`
waiters int32
}

// inc increments the number of waiters waiting for the lock
func (l *lockCtr) inc() {
atomic.AddInt32(&l.waiters, 1)
}

// dec decrements the number of waiters waiting on the lock
func (l *lockCtr) dec() {
atomic.AddInt32(&l.waiters, -1)
}

// count gets the current number of waiters
func (l *lockCtr) count() int32 {
return atomic.LoadInt32(&l.waiters)
}

// Lock locks the mutex
func (l *lockCtr) Lock() {
l.mu.Lock()
}

// Unlock unlocks the mutex
func (l *lockCtr) Unlock() {
l.mu.Unlock()
}

// New creates a new Locker
func New() *Locker {
return &Locker{
locks: make(map[string]*lockCtr),
}
}

// Lock locks a mutex with the given name. If it doesn't exist, one is created
func (l *Locker) Lock(name string) {
l.mu.Lock()
if l.locks == nil {
l.locks = make(map[string]*lockCtr)
}

nameLock, exists := l.locks[name]
if !exists {
nameLock = &lockCtr{}
l.locks[name] = nameLock
}

// increment the nameLock waiters while inside the main mutex
// this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently
nameLock.inc()
l.mu.Unlock()

// Lock the nameLock outside the main mutex so we don't block other operations
// once locked then we can decrement the number of waiters for this lock
nameLock.Lock()
nameLock.dec()
}

// Unlock unlocks the mutex with the given name
// If the given lock is not being waited on by any other callers, it is deleted
func (l *Locker) Unlock(name string) error {
l.mu.Lock()
nameLock, exists := l.locks[name]
if !exists {
l.mu.Unlock()
return ErrNoSuchLock
}

if nameLock.count() == 0 {
delete(l.locks, name)
}
nameLock.Unlock()

l.mu.Unlock()
return nil
}
Loading

0 comments on commit 665be9f

Please sign in to comment.