diff --git a/command/hashes.go b/command/hashes.go index dd3a7202..3ff8cb92 100644 --- a/command/hashes.go +++ b/command/hashes.go @@ -193,12 +193,17 @@ func HVals(ctx *Context, txn *db.Transaction) (OnCommit, error) { // HLen returns the number of fields contained in the hash stored at key func HLen(ctx *Context, txn *db.Transaction) (OnCommit, error) { + var len int64 key := []byte(ctx.Args[0]) hash, err := txn.Hash(key) if err != nil { return nil, err } - return Integer(ctx.Out, hash.HLen()), nil + len, err = hash.HLen() + if err != nil { + return nil, err + } + return Integer(ctx.Out, len), nil } // HStrLen returns the string length of the value associated with field in the hash stored at key @@ -265,3 +270,21 @@ func HMSet(ctx *Context, txn *db.Transaction) (OnCommit, error) { } return SimpleString(ctx.Out, "OK"), nil } + +// HMSlot +func HMSlot(ctx *Context, txn *db.Transaction) (OnCommit, error) { + key := []byte(ctx.Args[0]) + count, err := strconv.ParseInt(ctx.Args[1], 10, 64) + if err != nil || count < 0 { + return nil, ErrInteger + } + hash, err := txn.Hash(key) + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + + if err := hash.HMSlot(count); err != nil { + return nil, errors.New("ERR " + err.Error()) + } + return SimpleString(ctx.Out, "OK"), nil +} diff --git a/command/init.go b/command/init.go index 07f91cde..b6706f39 100644 --- a/command/init.go +++ b/command/init.go @@ -75,6 +75,7 @@ func init() { "hsetnx": HSetNX, "hmget": HMGet, "hmset": HMSet, + "hmslot": HMSlot, // sets "sadd": SAdd, @@ -169,6 +170,7 @@ func init() { "hsetnx": Desc{Proc: AutoCommit(HSetNX), Cons: Constraint{4, flags("wmF"), 1, 1, 1}}, "hmget": Desc{Proc: AutoCommit(HMGet), Cons: Constraint{-3, flags("rF"), 1, 1, 1}}, "hmset": Desc{Proc: AutoCommit(HMSet), Cons: Constraint{-3, flags("wmF"), 1, 1, 1}}, + "hmslot": Desc{Proc: AutoCommit(HMSlot), Cons: Constraint{3, flags("wF"), 1, 1, 1}}, // sets "sadd": Desc{Proc: AutoCommit(SAdd), Cons: Constraint{-3, flags("wmF"), 1, 1, 1}}, diff --git a/command/keys.go b/command/keys.go index 034c68a7..42722e1f 100644 --- a/command/keys.go +++ b/command/keys.go @@ -217,11 +217,20 @@ func Object(ctx *Context, txn *db.Transaction) (OnCommit, error) { } return nil, errors.New("ERR " + err.Error()) } - switch subCmd { case "refcount", "freq": return Integer(ctx.Out, 0), nil case "idletime": + if obj.Type == db.ObjectHash { + hash, err := txn.Hash(key) + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + obj, err = hash.Object() + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + } sec := int64(time.Since(time.Unix(0, obj.UpdatedAt)).Seconds()) return Integer(ctx.Out, sec), nil case "encoding": diff --git a/command/server.go b/command/server.go index 96223a02..683ce7bc 100644 --- a/command/server.go +++ b/command/server.go @@ -219,11 +219,21 @@ func Debug(ctx *Context, txn *db.Transaction) (OnCommit, error) { } } func debugObject(ctx *Context, txn *db.Transaction) (OnCommit, error) { - key := ctx.Args[1] - obj, err := txn.Object([]byte(key)) + key := []byte(ctx.Args[1]) + obj, err := txn.Object(key) if err != nil { return nil, err } + if obj.Type == db.ObjectHash { + hash, err := txn.Hash(key) + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + obj, err = hash.Object() + if err != nil { + return nil, errors.New("ERR " + err.Error()) + } + } return SimpleString(ctx.Out, obj.String()), nil } diff --git a/conf/config.go b/conf/config.go index 766eaeb0..9e43d553 100644 --- a/conf/config.go +++ b/conf/config.go @@ -21,8 +21,9 @@ type Server struct { //Tikv config is the config of tikv sdk type Tikv struct { - PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"` - ZT ZT `cfg:"zt"` + PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"` + MetaSlot int64 `cfg:"meta-slot;0;numeric;hashes slot key count"` + ZT ZT `cfg:"zt"` } //ZT config is the config of zlist diff --git a/conf/titan.toml b/conf/titan.toml index f4adfdb1..90e89ad0 100644 --- a/conf/titan.toml +++ b/conf/titan.toml @@ -24,11 +24,16 @@ auth = "" [server.tikv] #type: string -#rules: nonempty #description: pd address in tidb #required pd-addrs = "" +#type: int64 +#rules: numeric +#description: hashes slot key count +#default: 0 +#meta-slot = 0 + [server.tikv.zt] #type: int @@ -104,7 +109,7 @@ pd-addrs = "" #type: string #description: log level(debug, info, warn, error, panic, fatal) #default: info -#level = "debug" +#level = "info" #type: bool #rules: boolean @@ -116,3 +121,4 @@ pd-addrs = "" #description: log time rotate pattern(s m h D M W) #default: 0 0 0 * * * #time-rotate = "0 0 0 * * *" + diff --git a/db/db.go b/db/db.go index 3d63ce73..6d961e1b 100644 --- a/db/db.go +++ b/db/db.go @@ -111,7 +111,9 @@ func Open(conf *conf.Tikv) (*RedisStore, error) { go StartGC(sysdb) go StartExpire(sysdb) go StartZT(sysdb, &conf.ZT) - + if conf.MetaSlot != defaultHashMetaSlot { + defaultHashMetaSlot = conf.MetaSlot + } return rds, nil } @@ -252,6 +254,19 @@ func DataKey(db *DB, key []byte) []byte { return dkey } +// MetaSlotKey builds a meta slot key from a slot id +func MetaSlotKey(db *DB, objID, slotID []byte) []byte { + var skey []byte + skey = append(skey, []byte(db.Namespace)...) + skey = append(skey, ':') + skey = append(skey, db.ID.Bytes()...) + skey = append(skey, ':', 'M', 'S', ':') + skey = append(skey, objID...) + skey = append(skey, ':') + skey = append(skey, slotID...) + return skey +} + func flushLease(txn store.Transaction, key, id []byte, interval time.Duration) error { databytes := make([]byte, 24) copy(databytes, id) diff --git a/db/hash.go b/db/hash.go index fe444c22..b72c6f25 100644 --- a/db/hash.go +++ b/db/hash.go @@ -1,26 +1,86 @@ package db import ( - "encoding/json" + "bytes" + "encoding/binary" + "math/rand" "strconv" + + "github.com/meitu/titan/db/store" +) + +var ( + defaultHashMetaSlot int64 = 0 ) +//Slot slot information about hash meta +type Slot struct { + Len int64 + UpdatedAt int64 +} + +//EncodeSlot encodes slot data into byte slice +func EncodeSlot(s *Slot) []byte { + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], uint64(s.Len)) + binary.BigEndian.PutUint64(b[8:], uint64(s.UpdatedAt)) + return b +} + +// DecodeSlot decode slot data into slot field +func DecodeSlot(b []byte) (*Slot, error) { + if len(b) != 16 { + return nil, ErrInvalidLength + } + meta := &Slot{} + meta.Len = int64(binary.BigEndian.Uint64(b[:8])) + meta.UpdatedAt = int64(binary.BigEndian.Uint64(b[8:])) + return meta, nil +} + // HashMeta is the meta data of the hashtable type HashMeta struct { Object - Len int64 + Len int64 + MetaSlot int64 +} + +//Encode encodes meta data into byte slice +func (hm *HashMeta) Encode() []byte { + b := EncodeObject(&hm.Object) + meta := make([]byte, 16) + binary.BigEndian.PutUint64(meta[:8], uint64(hm.Len)) + binary.BigEndian.PutUint64(meta[8:], uint64(hm.MetaSlot)) + return append(b, meta...) +} + +//Decode decode meta data into meta field +func (hm *HashMeta) Decode(b []byte) error { + if len(b[ObjectEncodingLength:]) != 16 { + return ErrInvalidLength + } + obj, err := DecodeObject(b) + if err != nil { + return err + } + hm.Object = *obj + meta := b[ObjectEncodingLength:] + hm.Len = int64(binary.BigEndian.Uint64(meta[:8])) + hm.MetaSlot = int64(binary.BigEndian.Uint64(meta[8:])) + return nil } // Hash implements the hashtable type Hash struct { - meta HashMeta - key []byte - txn *Transaction + meta HashMeta + key []byte + exists bool + txn *Transaction } // GetHash returns a hash object, create new one if nonexists func GetHash(txn *Transaction, key []byte) (*Hash, error) { - hash := &Hash{txn: txn, key: key} + hash := &Hash{txn: txn, key: key, meta: HashMeta{}} mkey := MetaKey(txn.db, key) meta, err := txn.t.Get(mkey) @@ -34,40 +94,78 @@ func GetHash(txn *Transaction, key []byte) (*Hash, error) { hash.meta.Type = ObjectHash hash.meta.Encoding = ObjectEncodingHT hash.meta.Len = 0 + hash.meta.MetaSlot = defaultHashMetaSlot return hash, nil } return nil, err } - if err := json.Unmarshal(meta, &hash.meta); err != nil { + if err := hash.meta.Decode(meta); err != nil { return nil, err } if hash.meta.Type != ObjectHash { return nil, ErrTypeMismatch } + hash.exists = true return hash, nil } + func hashItemKey(key []byte, field []byte) []byte { key = append(key, ':') return append(key, field...) } +func slotGC(txn *Transaction, objID []byte) error { + key := MetaSlotKey(txn.db, objID, nil) + if err := gc(txn.t, key); err != nil { + return err + } + return nil +} + +func (hash *Hash) calculateSlotID(limit int64) int64 { + if !hash.isMetaSlot() || limit <= 1 { + return 0 + } + return rand.Int63n(limit - 1) +} + +func (hash *Hash) isMetaSlot() bool { + if hash.meta.MetaSlot != 0 { + return true + } + return false +} + // HDel removes the specified fields from the hash stored at key func (hash *Hash) HDel(fields [][]byte) (int64, error) { - var keys [][]byte - var num int64 + var ( + keys [][]byte + num int64 + ) dkey := DataKey(hash.txn.db, hash.meta.ID) for _, field := range fields { keys = append(keys, hashItemKey(dkey, field)) } - values, err := BatchGetValues(hash.txn, keys) + values, hlen, err := hash.delHash(keys) if err != nil { return 0, err } - for i, val := range values { - if val == nil { + if hlen == 0 { + return 0, nil + } + vlen := int64(len(values)) + if vlen >= hlen { + if err := hash.Destroy(); err != nil { + return 0, err + } + return vlen, nil + } + + for k, v := range values { + if v == nil { continue } - if err := hash.txn.t.Delete(keys[i]); err != nil { + if err := hash.txn.t.Delete([]byte(k)); err != nil { return 0, err } num++ @@ -75,16 +173,45 @@ func (hash *Hash) HDel(fields [][]byte) (int64, error) { if num == 0 { return 0, nil } - hash.meta.Len -= num - if hash.meta.Len == 0 { - return num, hash.Destory() - } - if err := hash.updateMeta(); err != nil { + + // update Len and UpdateAt + if err := hash.addLen(-num); err != nil { return 0, err } return num, nil } +func (hash *Hash) delHash(keys [][]byte) (map[string][]byte, int64, error) { + var ( + slots [][]byte + isMetaSlot = hash.isMetaSlot() + metaSlotKey = MetaSlotKey(hash.txn.db, hash.meta.ID, nil) + ) + if isMetaSlot { + metaSlotKeys := hash.getMetaSlotKeys() + keys = append(metaSlotKeys, keys...) + } + + kvMap, err := store.BatchGetValues(hash.txn.t, keys) + if err != nil { + return nil, 0, err + } + for k, v := range kvMap { + if isMetaSlot && bytes.Contains([]byte(k), metaSlotKey) { + slots = append(slots, v) + delete(kvMap, k) + } + } + if isMetaSlot && len(slots) > 0 { + slot, err := hash.calculateSlot(&slots) + if err != nil { + return nil, 0, err + } + return kvMap, slot.Len, nil + } + return kvMap, hash.meta.Len, nil +} + // HSet sets field in the hash stored at key to value func (hash *Hash) HSet(field []byte, value []byte) (int, error) { dkey := DataKey(hash.txn.db, hash.meta.ID) @@ -106,8 +233,7 @@ func (hash *Hash) HSet(field []byte, value []byte) (int, error) { if exist { return 0, nil } - hash.meta.Len++ - if err := hash.updateMeta(); err != nil { + if err := hash.addLen(1); err != nil { return 0, err } return 1, nil @@ -172,17 +298,29 @@ func (hash *Hash) HGetAll() ([][]byte, [][]byte, error) { return fields, vals, nil } -func (hash *Hash) updateMeta() error { - meta, err := json.Marshal(hash.meta) - if err != nil { +// Destroy the hash store +func (hash *Hash) Destroy() error { + metaKey := MetaKey(hash.txn.db, hash.key) + dataKey := DataKey(hash.txn.db, hash.meta.ID) + if err := hash.txn.t.Delete(metaKey); err != nil { + return err + } + if err := gc(hash.txn.t, dataKey); err != nil { return err } - return hash.txn.t.Set(MetaKey(hash.txn.db, hash.key), meta) -} -// Destory the hash store -func (hash *Hash) Destory() error { - return hash.txn.Destory(&hash.meta.Object, hash.key) + if hash.isMetaSlot() { + if err := slotGC(hash.txn, hash.meta.ID); err != nil { + return err + } + } + + if hash.meta.ExpireAt > 0 { + if err := unExpireAt(hash.txn.t, metaKey, hash.meta.ExpireAt); err != nil { + return err + } + } + return nil } // HExists returns if field is an existing field in the hash stored at key @@ -268,8 +406,32 @@ func (hash *Hash) HIncrByFloat(field []byte, v float64) (float64, error) { } // HLen returns the number of fields contained in the hash stored at key -func (hash *Hash) HLen() int64 { - return hash.meta.Len +func (hash *Hash) HLen() (int64, error) { + if hash.isMetaSlot() { + slot, err := hash.getAllSlot() + if err == nil { + return slot.Len, nil + } + if err != ErrKeyNotFound { + return 0, err + } + } + return hash.meta.Len, nil +} + +// Object new object from hash +func (hash *Hash) Object() (*Object, error) { + obj := hash.meta.Object + if hash.isMetaSlot() { + slot, err := hash.getAllSlot() + if err == nil { + obj.UpdatedAt = slot.UpdatedAt + } + if err != ErrKeyNotFound { + return nil, err + } + } + return &obj, nil } // HMGet returns the values associated with the specified fields in the hash stored at key @@ -305,3 +467,202 @@ func (hash *Hash) HMSet(fields [][]byte, values [][]byte) error { hash.meta.Len += added return hash.updateMeta() } + +// HMSet sets meta slot num +func (hash *Hash) HMSlot(metaSlot int64) error { + + if hash.isMetaSlot() { + slot, err := hash.getAllSlot() + + if err == ErrKeyNotFound { + slot = &Slot{Len: 0, UpdatedAt: Now()} + } else if err != nil { + return err + } + hash.meta.Len = slot.Len + hash.meta.UpdatedAt = slot.UpdatedAt + } + if err := hash.autoUpdateSlot(metaSlot); err != nil { + return err + } + hash.meta.MetaSlot = metaSlot + if err := hash.updateMeta(); err != nil { + return err + } + return nil +} + +func (hash *Hash) addLen(len int64) error { + isDirty := false + if hash.isMetaSlot() { + slotID := hash.calculateSlotID(hash.meta.MetaSlot) + if err := hash.addSlotLen(slotID, len); err != nil { + return err + } + } else { + hash.meta.Len += len + hash.meta.UpdatedAt = Now() + if err := hash.autoUpdateSlot(defaultHashMetaSlot); err == nil { + hash.meta.MetaSlot = defaultHashMetaSlot + } + isDirty = true + } + if isDirty || !hash.exists { + if err := hash.updateMeta(); err != nil { + return err + } + } + return nil +} + +func (hash *Hash) autoUpdateSlot(metaSlot int64) error { + isMetaSlot := hash.isMetaSlot() + if metaSlot < 0 { + return ErrInteger + } + if metaSlot == hash.meta.MetaSlot { + return nil + } + if metaSlot > hash.meta.MetaSlot { + if !isMetaSlot && hash.meta.Len > 0 { + slot := &Slot{Len: hash.meta.Len, UpdatedAt: Now()} + if err := hash.updateSlot(0, slot); err != nil { + return err + } + } + return nil + } + + if metaSlot < hash.meta.MetaSlot { + slot, err := hash.getSliceSlot(metaSlot-1, hash.meta.MetaSlot-1) + if err != nil { + if err == ErrKeyNotFound { + return nil + } + return err + } + sid := hash.calculateSlotID(metaSlot) + if err := hash.addSlotLen(sid, slot.Len); err != nil { + return err + } + if err := hash.clearSliceSlot(metaSlot, hash.meta.MetaSlot-1); err != nil { + return err + } + } + return nil +} + +func (hash *Hash) clearSliceSlot(start, end int64) error { + if start >= end || start < 0 || end < 1 { + return ErrOutOfRange + } + i := start + for i <= end { + metaSlotKey := MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(i)) + if err := hash.txn.t.Delete(metaSlotKey); err != nil { + return err + } + i++ + } + return nil +} + +func (hash *Hash) addSlotLen(newID int64, len int64) error { + slot, err := hash.getSlot(newID) + if err != nil { + return err + } + slot.Len += len + slot.UpdatedAt = Now() + return hash.updateSlot(newID, slot) +} + +func (hash *Hash) getSlot(slotID int64) (*Slot, error) { + metaSlotKey := MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(slotID)) + raw, err := hash.txn.t.Get(metaSlotKey) + if err != nil { + if IsErrNotFound(err) { + return &Slot{UpdatedAt: Now()}, nil + } + return nil, err + } + slot, err := DecodeSlot(raw) + if err != nil { + return nil, err + } + return slot, nil +} + +func (hash *Hash) updateMeta() error { + meta := hash.meta.Encode() + return hash.txn.t.Set(MetaKey(hash.txn.db, hash.key), meta) +} + +func (hash *Hash) updateSlot(slotID int64, slot *Slot) error { + slotKey := MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(slotID)) + metaSlot := EncodeSlot(slot) + return hash.txn.t.Set(slotKey, metaSlot) +} + +func (hash *Hash) getMetaSlotKeys() [][]byte { + metaSlot := hash.meta.MetaSlot + keys := make([][]byte, metaSlot) + for metaSlot > 0 { + keys = append(keys, MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(metaSlot))) + metaSlot-- + } + return keys +} + +func (hash *Hash) getAllSlot() (*Slot, error) { + return hash.getSliceSlot(0, hash.meta.MetaSlot-1) +} + +func (hash *Hash) getSliceSlot(start, end int64) (*Slot, error) { + if start >= end { + return nil, ErrOutOfRange + } + var rawSlots [][]byte + prefixKey := MetaSlotKey(hash.txn.db, hash.meta.ID, nil) + startKey := MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(start)) + iter, err := hash.txn.t.Seek(startKey) + if err != nil { + return nil, err + } + count := start + for iter.Valid() && iter.Key().HasPrefix(prefixKey) && count <= end { + rawSlots = append(rawSlots, iter.Value()) + if err := iter.Next(); err != nil { + break + } + count++ + } + + if len(rawSlots) > 0 { + slot, err := hash.calculateSlot(&rawSlots) + if err != nil { + return nil, err + } + + return slot, nil + } + return nil, ErrKeyNotFound +} + +func (hash *Hash) calculateSlot(vals *[][]byte) (*Slot, error) { + slot := &Slot{} + for _, val := range *vals { + if val == nil { + continue + } + meta, err := DecodeSlot(val) + if err != nil { + return nil, err + } + slot.Len += meta.Len + if meta.UpdatedAt > slot.UpdatedAt { + slot.UpdatedAt = meta.UpdatedAt + } + } + return slot, nil +} diff --git a/db/kv.go b/db/kv.go index 1aa49bd3..688755f0 100644 --- a/db/kv.go +++ b/db/kv.go @@ -78,7 +78,15 @@ func (kv *Kv) Delete(keys [][]byte) (int64, error) { if IsExpired(obj, now) { continue } - if err := kv.txn.Destory(obj, mapping[k]); err != nil { + if obj.Type == ObjectHash { + hash, err := kv.txn.Hash(mapping[k]) + if err != nil { + continue + } + if err := hash.Destroy(); err != nil { + continue + } + } else if err := kv.txn.Destory(obj, mapping[k]); err != nil { continue } count++ diff --git a/db/object.go b/db/object.go index 506b74dc..a9030ef6 100644 --- a/db/object.go +++ b/db/object.go @@ -129,7 +129,6 @@ func (txn *Transaction) Object(key []byte) (*Object, error) { if IsExpired(obj, Now()) { return nil, ErrKeyNotFound } - return obj, nil }