diff --git a/dmap_atomic.go b/dmap_atomic.go index e417a435..3f033c89 100644 --- a/dmap_atomic.go +++ b/dmap_atomic.go @@ -68,7 +68,7 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error) return 0, err } w.value = nval - err = db.put(protocol.OpPut, w) + err = db.put(w) if err != nil { return 0, err } @@ -78,9 +78,11 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error) // Incr atomically increments key by delta. The return value is the new value after being incremented or an error. func (dm *DMap) Incr(key string, delta int) (int, error) { w := &writeop{ - dmap: dm.name, - key: key, - timestamp: time.Now().UnixNano(), + opcode: protocol.OpPut, + replicaOpcode: protocol.OpPutReplica, + dmap: dm.name, + key: key, + timestamp: time.Now().UnixNano(), } return dm.db.atomicIncrDecr("incr", w, delta) } @@ -88,9 +90,11 @@ func (dm *DMap) Incr(key string, delta int) (int, error) { // Decr atomically decrements key by delta. The return value is the new value after being decremented or an error. func (dm *DMap) Decr(key string, delta int) (int, error) { w := &writeop{ - dmap: dm.name, - key: key, - timestamp: time.Now().UnixNano(), + opcode: protocol.OpPut, + replicaOpcode: protocol.OpPutReplica, + dmap: dm.name, + key: key, + timestamp: time.Now().UnixNano(), } return dm.db.atomicIncrDecr("decr", w, delta) } @@ -109,7 +113,7 @@ func (db *Olric) getPut(w *writeop) ([]byte, error) { if err != nil && err != ErrKeyNotFound { return nil, err } - err = db.put(protocol.OpPut, w) + err = db.put(w) if err != nil { return nil, err } @@ -126,10 +130,12 @@ func (dm *DMap) GetPut(key string, value interface{}) (interface{}, error) { return nil, err } w := &writeop{ - dmap: dm.name, - key: key, - value: val, - timestamp: time.Now().UnixNano(), + opcode: protocol.OpPut, + replicaOpcode: protocol.OpPutReplica, + dmap: dm.name, + key: key, + value: val, + timestamp: time.Now().UnixNano(), } rawval, err := dm.db.getPut(w) if err != nil { @@ -156,9 +162,11 @@ func (db *Olric) exIncrDecrOperation(req *protocol.Message) *protocol.Message { op = "decr" } w := &writeop{ - dmap: req.DMap, - key: req.Key, - timestamp: time.Now().UnixNano(), + opcode: protocol.OpPut, + replicaOpcode: protocol.OpPutReplica, + dmap: req.DMap, + key: req.Key, + timestamp: time.Now().UnixNano(), } newval, err := db.atomicIncrDecr(op, w, delta.(int)) if err != nil { @@ -176,10 +184,12 @@ func (db *Olric) exIncrDecrOperation(req *protocol.Message) *protocol.Message { func (db *Olric) exGetPutOperation(req *protocol.Message) *protocol.Message { w := &writeop{ - dmap: req.DMap, - key: req.Key, - value: req.Value, - timestamp: time.Now().UnixNano(), + opcode: protocol.OpPut, + replicaOpcode: protocol.OpPutReplica, + dmap: req.DMap, + key: req.Key, + value: req.Value, + timestamp: time.Now().UnixNano(), } oldval, err := db.getPut(w) if err != nil { diff --git a/dmap_put.go b/dmap_put.go index 82bc5aee..4cfa41dc 100644 --- a/dmap_put.go +++ b/dmap_put.go @@ -22,7 +22,6 @@ import ( "github.com/buraksezer/olric/config" "github.com/buraksezer/olric/internal/discovery" - "github.com/buraksezer/olric/internal/protocol" "github.com/buraksezer/olric/internal/storage" ) @@ -36,12 +35,14 @@ var ErrWriteQuorum = errors.New("write quorum cannot be reached") // writeop contains various values whose participate a write operation. type writeop struct { - dmap string - key string - value []byte - timestamp int64 - timeout time.Duration - flags int + opcode protocol.OpCode + replicaOpcode protocol.OpCode + dmap string + key string + value []byte + timestamp int64 + timeout time.Duration + flags int } // fromReq generates a new protocol message from writeop instance. @@ -49,11 +50,34 @@ func (w *writeop) fromReq(req *protocol.Message) { w.dmap = req.DMap w.key = req.Key w.value = req.Value - if req.Op == protocol.OpPut { + w.opcode = req.Op + + // Set opcode for a possible replica operation + switch w.opcode { + case protocol.OpPut: + w.replicaOpcode = protocol.OpPutReplica + case protocol.OpPutEx: + w.replicaOpcode = protocol.OpPutExReplica + case protocol.OpPutIf: + w.replicaOpcode = protocol.OpPutIfReplica + case protocol.OpPutIfEx: + w.replicaOpcode = protocol.OpPutIfExReplica + } + + // Extract extras + switch req.Op { + case protocol.OpPut, protocol.OpPutReplica: w.timestamp = req.Extra.(protocol.PutExtra).Timestamp - } else if req.Op == protocol.OpPutEx { + case protocol.OpPutEx, protocol.OpPutExReplica: w.timestamp = req.Extra.(protocol.PutExExtra).Timestamp w.timeout = time.Duration(req.Extra.(protocol.PutExExtra).TTL) + case protocol.OpPutIf, protocol.OpPutIfReplica: + w.flags = req.Extra.(protocol.PutIfExtra).Flags + w.timestamp = req.Extra.(protocol.PutIfExtra).Timestamp + case protocol.OpPutIfEx, protocol.OpPutIfExReplica: + w.flags = req.Extra.(protocol.PutIfExExtra).Flags + w.timestamp = req.Extra.(protocol.PutIfExExtra).Timestamp + w.timeout = time.Duration(req.Extra.(protocol.PutIfExExtra).TTL) } } @@ -64,17 +88,30 @@ func (w *writeop) toReq(opcode protocol.OpCode) *protocol.Message { Key: w.key, Value: w.value, } - if opcode == protocol.OpPut { + + // Prepare extras + switch opcode { + case protocol.OpPut, protocol.OpPutReplica: req.Extra = protocol.PutExtra{ Timestamp: w.timestamp, } - } else if opcode == protocol.OpPutEx { + case protocol.OpPutEx, protocol.OpPutExReplica: req.Extra = protocol.PutExExtra{ TTL: w.timeout.Nanoseconds(), Timestamp: w.timestamp, } + case protocol.OpPutIf, protocol.OpPutIfReplica: + req.Extra = protocol.PutIfExtra{ + Flags: w.flags, + Timestamp: w.timestamp, + } + case protocol.OpPutIfEx, protocol.OpPutIfExReplica: + req.Extra = protocol.PutIfExExtra{ + Flags: w.flags, + Timestamp: w.timestamp, + TTL: w.timeout.Nanoseconds(), + } } - return req } @@ -104,14 +141,14 @@ func (db *Olric) localPut(hkey uint64, dm *dmap, w *writeop) error { } func (db *Olric) asyncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error { - req := w.toReq(protocol.OpPutExReplica) + req := w.toReq(w.replicaOpcode) // Fire and forget mode. owners := db.getBackupPartitionOwners(hkey) for _, owner := range owners { db.wg.Add(1) go func(host discovery.Host) { defer db.wg.Done() - _, err := db.requestTo(host.String(), protocol.OpPutExReplica, req) + _, err := db.requestTo(host.String(), w.replicaOpcode, req) if err != nil { if db.log.V(3).Ok() { db.log.V(3).Printf("[ERROR] Failed to create replica in async mode: %v", err) @@ -123,13 +160,13 @@ func (db *Olric) asyncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error { } func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error { - req := w.toReq(protocol.OpPutExReplica) + req := w.toReq(w.replicaOpcode) // Quorum based replication. var successful int owners := db.getBackupPartitionOwners(hkey) for _, owner := range owners { - _, err := db.requestTo(owner.String(), protocol.OpPutExReplica, req) + _, err := db.requestTo(owner.String(), w.replicaOpcode, req) if err != nil { if db.log.V(3).Ok() { db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", owner, w.dmap, err) @@ -204,31 +241,44 @@ func (db *Olric) callPutOnCluster(hkey uint64, w *writeop) error { // put controls every write operation in Olric. It redirects the requests to its owner, // if the key belongs to another host. -func (db *Olric) put(opcode protocol.OpCode, w *writeop) error { +func (db *Olric) put(w *writeop) error { member, hkey := db.locateKey(w.dmap, w.key) if hostCmp(member, db.this) { // We are on the partition owner. return db.callPutOnCluster(hkey, w) } // Redirect to the partition owner. - req := w.toReq(opcode) - _, err := db.requestTo(member.String(), opcode, req) + req := w.toReq(w.opcode) + _, err := db.requestTo(member.String(), w.opcode, req) return err } -func (dm *DMap) prepareWriteop(key string, value interface{}, timeout time.Duration, flags int) (*writeop, error) { +func (dm *DMap) prepareWriteop(opcode protocol.OpCode, key string, + value interface{}, timeout time.Duration, flags int) (*writeop, error) { val, err := dm.db.serializer.Marshal(value) if err != nil { return nil, err } - return &writeop{ + w := &writeop{ + opcode: opcode, dmap: dm.name, key: key, value: val, timestamp: time.Now().UnixNano(), timeout: timeout, flags: flags, - }, nil + } + switch { + case opcode == protocol.OpPut: + w.replicaOpcode = protocol.OpPutReplica + case opcode == protocol.OpPutEx: + w.replicaOpcode = protocol.OpPutExReplica + case opcode == protocol.OpPutIf: + w.replicaOpcode = protocol.OpPutIfReplica + case opcode == protocol.OpPutIfEx: + w.replicaOpcode = protocol.OpPutIfExReplica + } + return w, nil } // PutEx sets the value for the given key with TTL. It overwrites any previous @@ -236,11 +286,11 @@ func (dm *DMap) prepareWriteop(key string, value interface{}, timeout time.Durat // is arbitrary. It is safe to modify the contents of the arguments after // Put returns but not before. func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) error { - w, err := dm.prepareWriteop(key, value, timeout, 0) + w, err := dm.prepareWriteop(protocol.OpPutEx, key, value, timeout, 0) if err != nil { return err } - return dm.db.put(protocol.OpPutEx, w) + return dm.db.put(w) } // Put sets the value for the given key. It overwrites any previous value @@ -248,33 +298,33 @@ func (dm *DMap) PutEx(key string, value interface{}, timeout time.Duration) erro // is arbitrary. It is safe to modify the contents of the arguments after // Put returns but not before. func (dm *DMap) Put(key string, value interface{}) error { - w, err := dm.prepareWriteop(key, value, nilTimeout, 0) + w, err := dm.prepareWriteop(protocol.OpPut, key, value, nilTimeout, 0) if err != nil { return err } - return dm.db.put(protocol.OpPut, w) + return dm.db.put(w) } func (dm *DMap) PutIf(key string, value interface{}, flags int) error { - w, err := dm.prepareWriteop(key, value, nilTimeout, flags) + w, err := dm.prepareWriteop(protocol.OpPutIf, key, value, nilTimeout, flags) if err != nil { return err } - return dm.db.put(protocol.OpPut, w) + return dm.db.put(w) } func (dm *DMap) PutIfEx(key string, value interface{}, timeout time.Duration, flags int) error { - w, err := dm.prepareWriteop(key, value, timeout, flags) + w, err := dm.prepareWriteop(protocol.OpPutIfEx, key, value, timeout, flags) if err != nil { return err } - return dm.db.put(protocol.OpPut, w) + return dm.db.put(w) } func (db *Olric) exPutOperation(req *protocol.Message) *protocol.Message { w := &writeop{} w.fromReq(req) - return db.prepareResponse(req, db.put(req.Op, w)) + return db.prepareResponse(req, db.put(w)) } func (db *Olric) putReplicaOperation(req *protocol.Message) *protocol.Message { diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index d447c0f0..17ce8a3e 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -47,8 +47,10 @@ type OpCode uint8 // ops const ( - OpPut OpCode = OpCode(iota) + OpPut OpCode = OpCode(iota) + 1 OpPutEx + OpPutIf + OpPutIfEx OpGet OpDelete OpDestroy @@ -59,7 +61,9 @@ const ( OpGetPut OpUpdateRouting OpPutReplica + OpPutIfReplica OpPutExReplica + OpPutIfExReplica OpDeletePrev OpGetPrev OpGetBackup @@ -130,6 +134,19 @@ type PutExExtra struct { Timestamp int64 } +// PutIfExtra defines extra values for this operation. +type PutIfExtra struct { + Flags int + Timestamp int64 +} + +// PutIfExExtra defines extra values for this operation. +type PutIfExExtra struct { + Flags int + Timestamp int64 + TTL int64 +} + // DMapCountOnPartExtra defines extra values for this operation. type DMapCountOnPartExtra struct { PartID uint64 @@ -187,6 +204,14 @@ func loadExtras(raw []byte, op OpCode) (interface{}, error) { extra := ExpireExtra{} err := binary.Read(bytes.NewReader(raw), binary.BigEndian, &extra) return extra, err + case OpPutIfEx, OpPutIfExReplica: + extra := PutIfExExtra{} + err := binary.Read(bytes.NewReader(raw), binary.BigEndian, &extra) + return extra, err + case OpPutIf, OpPutIfReplica: + extra := PutIfExExtra{} + err := binary.Read(bytes.NewReader(raw), binary.BigEndian, &extra) + return extra, err default: // Programming error return nil, fmt.Errorf("given OpCode: %v doesn't have extras", op) diff --git a/olric.go b/olric.go index 8bb9b297..1dafb6b2 100644 --- a/olric.go +++ b/olric.go @@ -333,6 +333,10 @@ func (db *Olric) registerOperations() { db.server.RegisterOperation(protocol.OpPutEx, db.exPutOperation) db.server.RegisterOperation(protocol.OpPutReplica, db.putReplicaOperation) db.server.RegisterOperation(protocol.OpPutExReplica, db.putReplicaOperation) + db.server.RegisterOperation(protocol.OpPutIf, db.exPutOperation) + db.server.RegisterOperation(protocol.OpPutIfEx, db.exPutOperation) + db.server.RegisterOperation(protocol.OpPutIfReplica, db.putReplicaOperation) + db.server.RegisterOperation(protocol.OpPutIfExReplica, db.putReplicaOperation) // Get db.server.RegisterOperation(protocol.OpGet, db.exGetOperation)