Skip to content

Commit

Permalink
Initial implementation of Expire command
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Oct 30, 2019
1 parent 321940c commit f13e5ed
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 2 deletions.
3 changes: 3 additions & 0 deletions dmap_eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ func (db *Olric) evictKeyWithLRU(dm *dmap, name string) error {
item := items[0]
vdata, err := dm.storage.Get(item.HKey)
if err != nil {
if err == storage.ErrKeyNotFound {
err = ErrKeyNotFound
}
return err
}
return db.delKeyVal(dm, item.HKey, name, vdata.Key)
Expand Down
90 changes: 90 additions & 0 deletions dmap_expire.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2019 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package olric

import (
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/storage"
"time"
)

func (db *Olric) localExpire(hkey uint64, dm *dmap, w *writeop) error {
ttl := getTTL(w.timeout)
val := &storage.VData{
Timestamp: w.timestamp,
TTL: ttl,
}
err := dm.storage.UpdateTTL(hkey, val)
if err != nil {
if err == storage.ErrKeyNotFound {
err = ErrKeyNotFound
}
return err
}
dm.updateAccessLog(hkey)
return nil
}

func (db *Olric) callExpireOnCluster(hkey uint64, w *writeop) error {
// Get the DMap and acquire its lock
dm, err := db.getDMap(w.dmap, hkey)
if err != nil {
return err
}
dm.Lock()
defer dm.Unlock()

if db.config.ReplicaCount == config.MinimumReplicaCount {
// MinimumReplicaCount is 1. So it's enough to put the key locally. There is no
// other replica host.
return db.localExpire(hkey, dm, w)
}
return nil
}

func (db *Olric) expire(w *writeop) error {
member, hkey := db.locateKey(w.dmap, w.key)
if hostCmp(member, db.this) {
// We are on the partition owner.
return db.callExpireOnCluster(hkey, w)
}
// Redirect to the partition owner
req := &protocol.Message{
DMap: w.dmap,
Key: w.key,
}
_, err := db.requestTo(member.String(), protocol.OpExpire, req)
if err != nil {
return err
}
return nil
}

func (db *Olric) exExpireOperation(req *protocol.Message) *protocol.Message {
w := &writeop{}
w.fromReq(req)
return db.prepareResponse(req, db.expire(w))
}

func (dm *DMap) Expire(key string, timeout time.Duration) error {
w := &writeop{
dmap: dm.name,
key: key,
timestamp: time.Now().UnixNano(),
timeout: timeout,
}
return dm.db.expire(w)
}
93 changes: 93 additions & 0 deletions dmap_expire_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2019 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package olric

import (
"context"
"sync/atomic"
"testing"
"time"
)

func TestDMap_SetExpireStandalone(t *testing.T) {
db, err := newDB(testStandaloneConfig())
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = db.Shutdown(context.Background())
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown Olric: %v", err)
}
}()

key := "mykey"
// Create a new DMap object and put a K/V pair.
dm, err := db.NewDMap("foobar")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
err = dm.Put(key, "value")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

// Get the value and check it.
_, err = dm.Get(key)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

err = dm.Expire(key, time.Millisecond)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

<-time.After(5 * time.Millisecond)
// Update currentUnixNano to evict the key now.
atomic.StoreInt64(&currentUnixNano, time.Now().UnixNano())

// Get the value and check it.
_, err = dm.Get(key)
if err != ErrKeyNotFound {
t.Fatalf("Expected ErrKeyNotFound. Got: %v", err)
}
}


func TestDMap_SetExpireKeyNotFound(t *testing.T) {
db, err := newDB(testStandaloneConfig())
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = db.Shutdown(context.Background())
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to shutdown Olric: %v", err)
}
}()

key := "mykey"
// Create a new DMap object and put a K/V pair.
dm, err := db.NewDMap("foobar")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

err = dm.Expire(key, time.Millisecond)
if err != ErrKeyNotFound {
t.Fatalf("Expected ErrKeyNotFound. Got: %v", err)
}
}
2 changes: 2 additions & 0 deletions internal/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const (
OpPipeline
OpPing
OpStats
OpExpire
OpExpireReplica
)

// StatusCode ...
Expand Down
22 changes: 21 additions & 1 deletion internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package storage

import (
"encoding/binary"

"github.com/pkg/errors"
"github.com/vmihailenco/msgpack"
)
Expand Down Expand Up @@ -199,6 +198,27 @@ func (s *Storage) Delete(hkey uint64) error {
return nil
}

func (s *Storage) UpdateTTL(hkey uint64, data *VData) error {
if len(s.tables) == 0 {
panic("tables cannot be empty")
}

// Scan available tables by starting the last added table.
for i := len(s.tables) - 1; i >= 0; i-- {
t := s.tables[i]
prev := t.updateTTL(hkey, data)
if prev {
// Try out the other tables.
continue
}
// Found the key, return the stored value with its metadata.
return nil
}
// Nothing here.
return ErrKeyNotFound
}


type transport struct {
HKeys map[uint64]int
Memory []byte
Expand Down
22 changes: 21 additions & 1 deletion internal/storage/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package storage

import (
"encoding/binary"

"github.com/pkg/errors"
)

Expand Down Expand Up @@ -204,3 +203,24 @@ func (t *table) delete(hkey uint64) bool {
t.inuse -= garbage
return false
}

func (t *table) updateTTL(hkey uint64, value *VData) bool {
offset, ok := t.hkeys[hkey]
if !ok {
// Try the previous table.
return true
}

// Key, 1 byte for key size, klen for key's actual length.
klen := int(uint8(t.memory[offset]))
offset += 1 + klen

// Set the new TTL. It's 8 bytes.
binary.BigEndian.PutUint64(t.memory[offset:], uint64(value.TTL))
offset += 8

// Set the new Timestamp. It's 8 bytes.
binary.BigEndian.PutUint64(t.memory[offset:], uint64(value.Timestamp))
return false
}

3 changes: 3 additions & 0 deletions olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ func (db *Olric) registerOperations() {
// Pipeline
db.server.RegisterOperation(protocol.OpPipeline, db.pipelineOperation)

// Expire
db.server.RegisterOperation(protocol.OpExpire, db.exExpireOperation)

// Internal
db.server.RegisterOperation(protocol.OpUpdateRouting, db.updateRoutingOperation)
db.server.RegisterOperation(protocol.OpMoveDMap, db.moveDMapOperation)
Expand Down

0 comments on commit f13e5ed

Please sign in to comment.