Skip to content

Commit

Permalink
PutIf and PutIfEx commands properly implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Nov 1, 2019
1 parent 665be9f commit dbcbc8d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 51 deletions.
48 changes: 29 additions & 19 deletions dmap_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error)
return 0, err
}
w.value = nval
err = db.put(protocol.OpPut, w)
err = db.put(w)
if err != nil {
return 0, err
}
Expand All @@ -78,19 +78,23 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error)
// Incr atomically increments key by delta. The return value is the new value after being incremented or an error.
func (dm *DMap) Incr(key string, delta int) (int, error) {
w := &writeop{
dmap: dm.name,
key: key,
timestamp: time.Now().UnixNano(),
opcode: protocol.OpPut,
replicaOpcode: protocol.OpPutReplica,
dmap: dm.name,
key: key,
timestamp: time.Now().UnixNano(),
}
return dm.db.atomicIncrDecr("incr", w, delta)
}

// Decr atomically decrements key by delta. The return value is the new value after being decremented or an error.
func (dm *DMap) Decr(key string, delta int) (int, error) {
w := &writeop{
dmap: dm.name,
key: key,
timestamp: time.Now().UnixNano(),
opcode: protocol.OpPut,
replicaOpcode: protocol.OpPutReplica,
dmap: dm.name,
key: key,
timestamp: time.Now().UnixNano(),
}
return dm.db.atomicIncrDecr("decr", w, delta)
}
Expand All @@ -109,7 +113,7 @@ func (db *Olric) getPut(w *writeop) ([]byte, error) {
if err != nil && err != ErrKeyNotFound {
return nil, err
}
err = db.put(protocol.OpPut, w)
err = db.put(w)
if err != nil {
return nil, err
}
Expand All @@ -126,10 +130,12 @@ func (dm *DMap) GetPut(key string, value interface{}) (interface{}, error) {
return nil, err
}
w := &writeop{
dmap: dm.name,
key: key,
value: val,
timestamp: time.Now().UnixNano(),
opcode: protocol.OpPut,
replicaOpcode: protocol.OpPutReplica,
dmap: dm.name,
key: key,
value: val,
timestamp: time.Now().UnixNano(),
}
rawval, err := dm.db.getPut(w)
if err != nil {
Expand All @@ -156,9 +162,11 @@ func (db *Olric) exIncrDecrOperation(req *protocol.Message) *protocol.Message {
op = "decr"
}
w := &writeop{
dmap: req.DMap,
key: req.Key,
timestamp: time.Now().UnixNano(),
opcode: protocol.OpPut,
replicaOpcode: protocol.OpPutReplica,
dmap: req.DMap,
key: req.Key,
timestamp: time.Now().UnixNano(),
}
newval, err := db.atomicIncrDecr(op, w, delta.(int))
if err != nil {
Expand All @@ -176,10 +184,12 @@ func (db *Olric) exIncrDecrOperation(req *protocol.Message) *protocol.Message {

func (db *Olric) exGetPutOperation(req *protocol.Message) *protocol.Message {
w := &writeop{
dmap: req.DMap,
key: req.Key,
value: req.Value,
timestamp: time.Now().UnixNano(),
opcode: protocol.OpPut,
replicaOpcode: protocol.OpPutReplica,
dmap: req.DMap,
key: req.Key,
value: req.Value,
timestamp: time.Now().UnixNano(),
}
oldval, err := db.getPut(w)
if err != nil {
Expand Down
112 changes: 81 additions & 31 deletions dmap_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/discovery"

"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/storage"
)
Expand All @@ -36,24 +35,49 @@ var ErrWriteQuorum = errors.New("write quorum cannot be reached")

// writeop contains various values whose participate a write operation.
type writeop struct {
dmap string
key string
value []byte
timestamp int64
timeout time.Duration
flags int
opcode protocol.OpCode
replicaOpcode protocol.OpCode
dmap string
key string
value []byte
timestamp int64
timeout time.Duration
flags int
}

// fromReq generates a new protocol message from writeop instance.
func (w *writeop) fromReq(req *protocol.Message) {
w.dmap = req.DMap
w.key = req.Key
w.value = req.Value
if req.Op == protocol.OpPut {
w.opcode = req.Op

// Set opcode for a possible replica operation
switch w.opcode {
case protocol.OpPut:
w.replicaOpcode = protocol.OpPutReplica
case protocol.OpPutEx:
w.replicaOpcode = protocol.OpPutExReplica
case protocol.OpPutIf:
w.replicaOpcode = protocol.OpPutIfReplica
case protocol.OpPutIfEx:
w.replicaOpcode = protocol.OpPutIfExReplica
}

// Extract extras
switch req.Op {
case protocol.OpPut, protocol.OpPutReplica:
w.timestamp = req.Extra.(protocol.PutExtra).Timestamp
} else if req.Op == protocol.OpPutEx {
case protocol.OpPutEx, protocol.OpPutExReplica:
w.timestamp = req.Extra.(protocol.PutExExtra).Timestamp
w.timeout = time.Duration(req.Extra.(protocol.PutExExtra).TTL)
case protocol.OpPutIf, protocol.OpPutIfReplica:
w.flags = req.Extra.(protocol.PutIfExtra).Flags
w.timestamp = req.Extra.(protocol.PutIfExtra).Timestamp
case protocol.OpPutIfEx, protocol.OpPutIfExReplica:
w.flags = req.Extra.(protocol.PutIfExExtra).Flags
w.timestamp = req.Extra.(protocol.PutIfExExtra).Timestamp
w.timeout = time.Duration(req.Extra.(protocol.PutIfExExtra).TTL)
}
}

Expand All @@ -64,17 +88,30 @@ func (w *writeop) toReq(opcode protocol.OpCode) *protocol.Message {
Key: w.key,
Value: w.value,
}
if opcode == protocol.OpPut {

// Prepare extras
switch opcode {
case protocol.OpPut, protocol.OpPutReplica:
req.Extra = protocol.PutExtra{
Timestamp: w.timestamp,
}
} else if opcode == protocol.OpPutEx {
case protocol.OpPutEx, protocol.OpPutExReplica:
req.Extra = protocol.PutExExtra{
TTL: w.timeout.Nanoseconds(),
Timestamp: w.timestamp,
}
case protocol.OpPutIf, protocol.OpPutIfReplica:
req.Extra = protocol.PutIfExtra{
Flags: w.flags,
Timestamp: w.timestamp,
}
case protocol.OpPutIfEx, protocol.OpPutIfExReplica:
req.Extra = protocol.PutIfExExtra{
Flags: w.flags,
Timestamp: w.timestamp,
TTL: w.timeout.Nanoseconds(),
}
}

return req
}

Expand Down Expand Up @@ -104,14 +141,14 @@ func (db *Olric) localPut(hkey uint64, dm *dmap, w *writeop) error {
}

func (db *Olric) asyncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
req := w.toReq(protocol.OpPutExReplica)
req := w.toReq(w.replicaOpcode)
// Fire and forget mode.
owners := db.getBackupPartitionOwners(hkey)
for _, owner := range owners {
db.wg.Add(1)
go func(host discovery.Host) {
defer db.wg.Done()
_, err := db.requestTo(host.String(), protocol.OpPutExReplica, req)
_, err := db.requestTo(host.String(), w.replicaOpcode, req)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to create replica in async mode: %v", err)
Expand All @@ -123,13 +160,13 @@ func (db *Olric) asyncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
}

func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error {
req := w.toReq(protocol.OpPutExReplica)
req := w.toReq(w.replicaOpcode)

// Quorum based replication.
var successful int
owners := db.getBackupPartitionOwners(hkey)
for _, owner := range owners {
_, err := db.requestTo(owner.String(), protocol.OpPutExReplica, req)
_, err := db.requestTo(owner.String(), w.replicaOpcode, req)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", owner, w.dmap, err)
Expand Down Expand Up @@ -204,77 +241,90 @@ func (db *Olric) callPutOnCluster(hkey uint64, w *writeop) error {

// put controls every write operation in Olric. It redirects the requests to its owner,
// if the key belongs to another host.
func (db *Olric) put(opcode protocol.OpCode, w *writeop) error {
func (db *Olric) put(w *writeop) error {
member, hkey := db.locateKey(w.dmap, w.key)
if hostCmp(member, db.this) {
// We are on the partition owner.
return db.callPutOnCluster(hkey, w)
}
// Redirect to the partition owner.
req := w.toReq(opcode)
_, err := db.requestTo(member.String(), opcode, req)
req := w.toReq(w.opcode)
_, err := db.requestTo(member.String(), w.opcode, req)
return err
}

func (dm *DMap) prepareWriteop(key string, value interface{}, timeout time.Duration, flags int) (*writeop, error) {
func (dm *DMap) prepareWriteop(opcode protocol.OpCode, key string,
value interface{}, timeout time.Duration, flags int) (*writeop, error) {
val, err := dm.db.serializer.Marshal(value)
if err != nil {
return nil, err
}
return &writeop{
w := &writeop{
opcode: opcode,
dmap: dm.name,
key: key,
value: val,
timestamp: time.Now().UnixNano(),
timeout: timeout,
flags: flags,
}, nil
}
switch {
case opcode == protocol.OpPut:
w.replicaOpcode = protocol.OpPutReplica
case opcode == protocol.OpPutEx:
w.replicaOpcode = protocol.OpPutExReplica
case opcode == protocol.OpPutIf:
w.replicaOpcode = protocol.OpPutIfReplica
case opcode == protocol.OpPutIfEx:
w.replicaOpcode = protocol.OpPutIfExReplica
}
return w, nil
}

// PutEx sets the value for the given key with TTL. It overwrites any previous
// value for that key. It's thread-safe. The key has to be string. Value type
// is arbitrary. It is safe to modify the contents of the arguments after
// Put returns but not before.
func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) error {
w, err := dm.prepareWriteop(key, value, timeout, 0)
w, err := dm.prepareWriteop(protocol.OpPutEx, key, value, timeout, 0)
if err != nil {
return err
}
return dm.db.put(protocol.OpPutEx, w)
return dm.db.put(w)
}

// Put sets the value for the given key. It overwrites any previous value
// for that key and it's thread-safe. The key has to be string. Value type
// is arbitrary. It is safe to modify the contents of the arguments after
// Put returns but not before.
func (dm *DMap) Put(key string, value interface{}) error {
w, err := dm.prepareWriteop(key, value, nilTimeout, 0)
w, err := dm.prepareWriteop(protocol.OpPut, key, value, nilTimeout, 0)
if err != nil {
return err
}
return dm.db.put(protocol.OpPut, w)
return dm.db.put(w)
}

func (dm *DMap) PutIf(key string, value interface{}, flags int) error {
w, err := dm.prepareWriteop(key, value, nilTimeout, flags)
w, err := dm.prepareWriteop(protocol.OpPutIf, key, value, nilTimeout, flags)
if err != nil {
return err
}
return dm.db.put(protocol.OpPut, w)
return dm.db.put(w)
}

func (dm *DMap) PutIfEx(key string, value interface{}, timeout time.Duration, flags int) error {
w, err := dm.prepareWriteop(key, value, timeout, flags)
w, err := dm.prepareWriteop(protocol.OpPutIfEx, key, value, timeout, flags)
if err != nil {
return err
}
return dm.db.put(protocol.OpPut, w)
return dm.db.put(w)
}

func (db *Olric) exPutOperation(req *protocol.Message) *protocol.Message {
w := &writeop{}
w.fromReq(req)
return db.prepareResponse(req, db.put(req.Op, w))
return db.prepareResponse(req, db.put(w))
}

func (db *Olric) putReplicaOperation(req *protocol.Message) *protocol.Message {
Expand Down
27 changes: 26 additions & 1 deletion internal/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ type OpCode uint8

// ops
const (
OpPut OpCode = OpCode(iota)
OpPut OpCode = OpCode(iota) + 1
OpPutEx
OpPutIf
OpPutIfEx
OpGet
OpDelete
OpDestroy
Expand All @@ -59,7 +61,9 @@ const (
OpGetPut
OpUpdateRouting
OpPutReplica
OpPutIfReplica
OpPutExReplica
OpPutIfExReplica
OpDeletePrev
OpGetPrev
OpGetBackup
Expand Down Expand Up @@ -130,6 +134,19 @@ type PutExExtra struct {
Timestamp int64
}

// PutIfExtra defines extra values for this operation.
type PutIfExtra struct {
Flags int
Timestamp int64
}

// PutIfExExtra defines extra values for this operation.
type PutIfExExtra struct {
Flags int
Timestamp int64
TTL int64
}

// DMapCountOnPartExtra defines extra values for this operation.
type DMapCountOnPartExtra struct {
PartID uint64
Expand Down Expand Up @@ -187,6 +204,14 @@ func loadExtras(raw []byte, op OpCode) (interface{}, error) {
extra := ExpireExtra{}
err := binary.Read(bytes.NewReader(raw), binary.BigEndian, &extra)
return extra, err
case OpPutIfEx, OpPutIfExReplica:
extra := PutIfExExtra{}
err := binary.Read(bytes.NewReader(raw), binary.BigEndian, &extra)
return extra, err
case OpPutIf, OpPutIfReplica:
extra := PutIfExExtra{}
err := binary.Read(bytes.NewReader(raw), binary.BigEndian, &extra)
return extra, err
default:
// Programming error
return nil, fmt.Errorf("given OpCode: %v doesn't have extras", op)
Expand Down
Loading

0 comments on commit dbcbc8d

Please sign in to comment.