Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

35 Added the support for FILTERKEYS #84

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions IStorageEngines/diceKV.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package istorageengines

Comment on lines +1 to +2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name of the package should be storage_engines

import (
"sync"

"github.com/dicedb/dice/object"
)

type IKVStorage interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename the interface and keep it StorageEngine

Put(key string, obj *object.Obj)
Get(key string) *object.Obj
Del(key string) bool
GetCount() uint64
GetStorage() *sync.Map
Comment on lines +13 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it to Count and Storage ... more go-like. and as discussed, the Storage should return an iterator - a function that upon invocation returns the next KV pair.

// GetExpiry()
}
4 changes: 4 additions & 0 deletions IStorageEngines/redisKV.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package istorageengines

// This is for future usage to use Redis Storage
// as the main storage engine instead of KV
Comment on lines +1 to +4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove this file.

13 changes: 8 additions & 5 deletions core/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,31 @@ import (
"os"
"strings"

dbEngine "github.com/dicedb/dice/IStorageEngines"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbEngine -> storage_engine

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/object"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of creating another package for object see if we can fit it in storage_engine or core

)

// TODO: Support Expiration
// TODO: Support non-kv data structures
// TODO: Support sync write
func dumpKey(fp *os.File, key string, obj *Obj) {
func dumpKey(fp *os.File, key string, obj *object.Obj) {
cmd := fmt.Sprintf("SET %s %s", key, obj.Value)
tokens := strings.Split(cmd, " ")
fp.Write(Encode(tokens, false))
}

// TODO: To to new and switch
func DumpAllAOF() {
func DumpAllAOF(dh dbEngine.IKVStorage) {
fp, err := os.OpenFile(config.AOFFile, os.O_CREATE|os.O_WRONLY, os.ModeAppend)
if err != nil {
fmt.Print("error", err)
return
}
log.Println("rewriting AOF file at", config.AOFFile)
for k, obj := range store {
dumpKey(fp, k, obj)
}
dh.GetStorage().Range(func(k, v interface{}) bool {
dumpKey(fp, k.(string), v.(*object.Obj))
return true
})
log.Println("AOF file rewrite complete")
}
6 changes: 4 additions & 2 deletions core/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"syscall"

"github.com/dicedb/dice/handlers"
)

type Client struct {
Expand All @@ -26,13 +28,13 @@ func (c *Client) TxnBegin() {
c.isTxn = true
}

func (c *Client) TxnExec() []byte {
func (c *Client) TxnExec(dh *handlers.DiceKVstoreHandler) []byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling it handler, let's name our storage engine and use that here.

I propose the name - ozone

var out []byte
buf := bytes.NewBuffer(out)

buf.WriteString(fmt.Sprintf("*%d\r\n", len(c.cqueue)))
for _, _cmd := range c.cqueue {
buf.Write(executeCommand(_cmd, c))
buf.Write(executeCommand(_cmd, c, dh))
}

c.cqueue = make(RedisCmds, 0)
Expand Down
127 changes: 87 additions & 40 deletions core/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@ import (
"bytes"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/dicedb/dice/eviction"
"github.com/dicedb/dice/handlers"
"github.com/dicedb/dice/object"
"github.com/dicedb/dice/pool"
)

var RESP_NIL []byte = []byte("$-1\r\n")
Expand Down Expand Up @@ -39,7 +47,7 @@ func evalPING(args []string) []byte {
return b
}

func evalSET(args []string) []byte {
func evalSET(args []string, dh *handlers.DiceKVstoreHandler) []byte {
if len(args) <= 1 {
return Encode(errors.New("ERR wrong number of arguments for 'set' command"), false)
}
Expand Down Expand Up @@ -69,50 +77,88 @@ func evalSET(args []string) []byte {
}

// putting the k and value in a Hash Table
Put(key, NewObj(value, exDurationMs, oType, oEnc))
dh.Put(key, object.NewObj(value, exDurationMs, oType, oEnc))
return RESP_OK
}

func evalGET(args []string) []byte {
func compileToken(regex string) (matcher *regexp.Regexp, err error) {
// NOTE: It is assumed that users won't pass * as part of the
// string but only as wildcard
tokenNormalized := strings.ReplaceAll(regex, "*", ".*")
if matcher, err = regexp.Compile(tokenNormalized); err != nil {
matcher = nil
}
return
}

func evalFILTERKEYS(args []string, dh *handlers.DiceKVstoreHandler) []byte {
if len(args) != 1 {
return Encode(errors.New("ERR wrong number of arguments for 'FILTERKEYS' command"), false)
}
var tokenKey string = args[0]

// Get the regex compiled on the token
matcher, err := compileToken(tokenKey)
if err != nil {
return Encode(errors.New("ERR key is malformed in command"), false)
}

var allData sync.Map
// Define the worker Job
diceWorker := pool.NewDiceWorker(func(i interface{}) {
buf := i.(object.DiceWorkerBuffer)
// fmt.Printf("{Key: %v, Value: %v}\n", buf.Key, (*buf.Value).Value)
key := buf.Key
val := buf.Value.Value
if matcher.MatchString(key) {
allData.Store(key, val)
// fmt.Printf("{Key: %v, Val: %v}\n", key, val)
}
})
// Spawn the worker
diceWorker.Work(dh)
return Encode(allData, false)
}

func evalGET(args []string, dh *handlers.DiceKVstoreHandler) []byte {
if len(args) != 1 {
return Encode(errors.New("ERR wrong number of arguments for 'get' command"), false)
}

var key string = args[0]

// Get the key from the hash table
obj := Get(key)
obj := dh.Get(key)

// if key does not exist, return RESP encoded nil
if obj == nil {
return RESP_NIL
}

// if key already expired then return nil
if hasExpired(obj) {
if object.GetDiceExpiryStore().HasExpired(obj) {
return RESP_NIL
}

// return the RESP encoded value
return Encode(obj.Value, false)
}

func evalTTL(args []string) []byte {
func evalTTL(args []string, dh *handlers.DiceKVstoreHandler) []byte {
if len(args) != 1 {
return Encode(errors.New("ERR wrong number of arguments for 'ttl' command"), false)
}

var key string = args[0]

obj := Get(key)
obj := dh.Get(key)

// if key does not exist, return RESP encoded -2 denoting key does not exist
if obj == nil {
return RESP_MINUS_2
}

// if object exist, but no expiration is set on it then send -1
exp, isExpirySet := getExpiry(obj)
exp, isExpirySet := object.GetDiceExpiryStore().GetExpiry(obj)
if !isExpirySet {
return RESP_MINUS_1
}
Expand All @@ -129,19 +175,19 @@ func evalTTL(args []string) []byte {
return Encode(int64(durationMs/1000), false)
}

func evalDEL(args []string) []byte {
func evalDEL(args []string, dh *handlers.DiceKVstoreHandler) []byte {
var countDeleted int = 0

for _, key := range args {
if ok := Del(key); ok {
if ok := dh.Del(key); ok {
countDeleted++
}
}

return Encode(countDeleted, false)
}

func evalEXPIRE(args []string) []byte {
func evalEXPIRE(args []string, dh *handlers.DiceKVstoreHandler) []byte {
if len(args) <= 1 {
return Encode(errors.New("ERR wrong number of arguments for 'expire' command"), false)
}
Expand All @@ -152,14 +198,14 @@ func evalEXPIRE(args []string) []byte {
return Encode(errors.New("ERR value is not an integer or out of range"), false)
}

obj := Get(key)
obj := dh.Get(key)

// 0 if the timeout was not set. e.g. key doesn't exist, or operation skipped due to the provided arguments
if obj == nil {
return RESP_ZERO
}

setExpiry(obj, exDurationSec*1000)
object.GetDiceExpiryStore().SetExpiry(obj, exDurationSec*100)

// 1 if the timeout was set.
return RESP_ONE
Expand All @@ -170,39 +216,39 @@ based on CoW optimization and Fork */
// TODO: Implement Acknowledgement so that main process could know whether child has finished writing to its AOF file or not.
// TODO: Make it safe from failure, an stable policy would be to write the new flushes to a temporary files and then rename them to the main process's AOF file
// TODO: Add fsync() and fdatasync() to persist to AOF for above cases.
func evalBGREWRITEAOF(args []string) []byte {
func evalBGREWRITEAOF(args []string, dh *handlers.DiceKVstoreHandler) []byte {
// Fork a child process, this child process would inherit all the uncommitted pages from main process.
// This technique utilizes the CoW or copy-on-write, so while the main process is free to modify them
// the child would save all the pages to disk.
// Check details here -https://www.sobyte.net/post/2022-10/fork-cow/
newChild, _, _ := syscall.Syscall(syscall.SYS_FORK, 0, 0, 0)
if newChild == 0 {
//We are inside child process now, so we'll start flushing to disk.
DumpAllAOF()
DumpAllAOF(dh)
return []byte("")
} else {
//Back to main thread
return RESP_OK
}
}

func evalINCR(args []string) []byte {
func evalINCR(args []string, dh *handlers.DiceKVstoreHandler) []byte {
if len(args) != 1 {
return Encode(errors.New("ERR wrong number of arguments for 'incr' command"), false)
}

var key string = args[0]
obj := Get(key)
obj := dh.Get(key)
if obj == nil {
obj = NewObj("0", -1, OBJ_TYPE_STRING, OBJ_ENCODING_INT)
Put(key, obj)
obj = object.NewObj("0", -1, object.OBJ_TYPE_STRING, object.OBJ_ENCODING_INT)
dh.Put(key, obj)
}

if err := assertType(obj.TypeEncoding, OBJ_TYPE_STRING); err != nil {
if err := assertType(obj.TypeEncoding, object.OBJ_TYPE_STRING); err != nil {
return Encode(err, false)
}

if err := assertEncoding(obj.TypeEncoding, OBJ_ENCODING_INT); err != nil {
if err := assertEncoding(obj.TypeEncoding, object.OBJ_ENCODING_INT); err != nil {
return Encode(err, false)
}

Expand Down Expand Up @@ -231,8 +277,8 @@ func evalLATENCY(args []string) []byte {
return Encode([]string{}, false)
}

func evalLRU(args []string) []byte {
evictAllkeysLRU()
func evalLRU(args []string, dh *handlers.DiceKVstoreHandler) []byte {
eviction.EvictAllkeysLRU(dh)
return RESP_OK
}

Expand All @@ -253,32 +299,34 @@ func evalMULTI(args []string) []byte {
return RESP_OK
}

func executeCommand(cmd *RedisCmd, c *Client) []byte {
func executeCommand(cmd *RedisCmd, c *Client, dh *handlers.DiceKVstoreHandler) []byte {
switch cmd.Cmd {
case "PING":
return evalPING(cmd.Args)
case "SET":
return evalSET(cmd.Args)
return evalSET(cmd.Args, dh)
case "GET":
return evalGET(cmd.Args)
return evalGET(cmd.Args, dh)
case "FILTERKEYS":
return evalFILTERKEYS(cmd.Args, dh)
case "TTL":
return evalTTL(cmd.Args)
return evalTTL(cmd.Args, dh)
case "DEL":
return evalDEL(cmd.Args)
return evalDEL(cmd.Args, dh)
case "EXPIRE":
return evalEXPIRE(cmd.Args)
return evalEXPIRE(cmd.Args, dh)
case "BGREWRITEAOF":
return evalBGREWRITEAOF(cmd.Args)
return evalBGREWRITEAOF(cmd.Args, dh)
case "INCR":
return evalINCR(cmd.Args)
return evalINCR(cmd.Args, dh)
case "INFO":
return evalINFO(cmd.Args)
case "CLIENT":
return evalCLIENT(cmd.Args)
case "LATENCY":
return evalLATENCY(cmd.Args)
case "LRU":
return evalLRU(cmd.Args)
return evalLRU(cmd.Args, dh)
case "SLEEP":
return evalSLEEP(cmd.Args)
case "MULTI":
Expand All @@ -288,7 +336,7 @@ func executeCommand(cmd *RedisCmd, c *Client) []byte {
if !c.isTxn {
return Encode(errors.New("ERR EXEC without MULTI"), false)
}
return c.TxnExec()
return c.TxnExec(dh)
case "DISCARD":
if !c.isTxn {
return Encode(errors.New("ERR DISCARD without MULTI"), false)
Expand All @@ -300,19 +348,18 @@ func executeCommand(cmd *RedisCmd, c *Client) []byte {
}
}

func executeCommandToBuffer(cmd *RedisCmd, buf *bytes.Buffer, c *Client) {
buf.Write(executeCommand(cmd, c))
func executeCommandToBuffer(cmd *RedisCmd, buf *bytes.Buffer, c *Client, dh *handlers.DiceKVstoreHandler) {
buf.Write(executeCommand(cmd, c, dh))
}

func EvalAndRespond(cmds RedisCmds, c *Client) {
func EvalAndRespond(cmds RedisCmds, c *Client, dh *handlers.DiceKVstoreHandler) {
var response []byte
buf := bytes.NewBuffer(response)

for _, cmd := range cmds {
// if txn is not in progress, then we can simply
// execute the command and add the response to the buffer
if !c.isTxn {
executeCommandToBuffer(cmd, buf, c)
executeCommandToBuffer(cmd, buf, c, dh)
continue
}

Expand All @@ -326,7 +373,7 @@ func EvalAndRespond(cmds RedisCmds, c *Client) {
// if txn is active and the command is non-queuable
// ex: EXEC, DISCARD
// we execute the command and gather the response in buffer
executeCommandToBuffer(cmd, buf, c)
executeCommandToBuffer(cmd, buf, c, dh)
}
}
c.Write(buf.Bytes())
Expand Down
Loading