Skip to content

Commit

Permalink
Merge pull request #50 from meitu/feature/gc-interval-conf
Browse files Browse the repository at this point in the history
 move interval to config file from gc/expire work
  • Loading branch information
YIDWang committed Feb 12, 2019
2 parents 4e1b941 + 888f535 commit 40bca27
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 58 deletions.
6 changes: 1 addition & 5 deletions command/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ import (
"github.com/meitu/titan/conf"
"github.com/meitu/titan/context"
"github.com/meitu/titan/db"
"github.com/meitu/titan/db/store"
)

var cfg = &conf.Tikv{
PdAddrs: store.MockAddr,
DB: conf.DB{},
}
var cfg = &conf.MockConf().Tikv
var mockdb *db.RedisStore

func init() {
Expand Down
33 changes: 26 additions & 7 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package conf

import "time"

//Titan configuration center
// Titan configuration center
type Titan struct {
Server Server `cfg:"server"`
Status Status `cfg:"status"`
Expand All @@ -12,45 +12,64 @@ type Titan struct {
PIDFileName string `cfg:"pid-filename; titan.pid; ; the file name to record connd PID"`
}

// DB config is the config of titan data struct
type DB struct {
Hash Hash `cfg:"hash"`
}

// Hash config is the config of titan hash data struct
type Hash struct {
MetaSlot int64 `cfg:"meta-slot;0;numeric;hashes slot key count"`
}

//Server config is the config of titan server
// Server config is the config of titan server
type Server struct {
Auth string `cfg:"auth;;;client connetion auth"`
Listen string `cfg:"listen; 0.0.0.0:7369; netaddr; address to listen"`
MaxConnection int64 `cfg:"max-connection;1000;numeric;client connection count"`
}

//Tikv config is the config of tikv sdk
// Tikv config is the config of tikv sdk
type Tikv struct {
PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"`
DB DB `cfg:"db"`
GC GC `cfg:"gc"`
Expire Expire `cfg:"expire"`
ZT ZT `cfg:"zt"`
TikvGC TikvGC `cfg:"tikv-gc"`
}

// TikvGC config is the config of implement tikv sdk gcwork
type TikvGC struct {
Interval time.Duration `cfg:"interval;20m;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;30m;;lease flush leader interval"`
SafePointLifeTime time.Duration `cfg:"safe-point-life-time;10m;;safe point life time "`
Concurrency int `cfg:"concurrency;2;;gc work concurrency"`
}

//ZT config is the config of zlist
// GC config is the config of Titan GC work
type GC struct {
Interval time.Duration `cfg:"interval;1s;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

// Expire config is the config of Titan expire work
type Expire struct {
Interval time.Duration `cfg:"interval;1s;;expire work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

// ZT config is the config of zlist
type ZT struct {
Wrokers int `cfg:"workers;5;numeric;parallel workers count"`
BatchCount int `cfg:"batch;10;numeric;object transfer limitation per-transection"`
QueueDepth int `cfg:"depth;100;numeric;ZT Worker queue depth"`
Interval time.Duration `cfg:"interval;1000ms; ;Queue fill interval in milsecond"`
}

//Logger config is the config of default zap log
// Logger config is the config of default zap log
type Logger struct {
Name string `cfg:"name; titan; ; the default logger name"`
Path string `cfg:"path; logs/titan; ; the default log path"`
Expand All @@ -59,15 +78,15 @@ type Logger struct {
TimeRotate string `cfg:"time-rotate; 0 0 0 * * *; ; log time rotate pattern(s m h D M W)"`
}

//TikvLogger config is the config of tikv log
// TikvLogger config is the config of tikv log
type TikvLogger struct {
Path string `cfg:"path; logs/tikv;nonempty ; the default log path"`
Level string `cfg:"level; info; ; log level(debug, info, warn, error, panic, fatal)"`
Compress bool `cfg:"compress; false; boolean; true for enabling log compress"`
TimeRotate string `cfg:"time-rotate; 0 0 0 * * *; ; log time rotate pattern(s m h D M W)"`
}

//Status config is the config of exported server
// Status config is the config of exported server
type Status struct {
Listen string `cfg:"listen;0.0.0.0:7345;nonempty; listen address of http server"`
}
34 changes: 34 additions & 0 deletions conf/mockconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package conf

import "time"

// MockConf init and return titan mock conf
func MockConf() *Titan {
return &Titan{
Tikv: Tikv{
PdAddrs: "mocktikv://",
GC: GC{
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
},
Expire: Expire{
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
},
ZT: ZT{
Wrokers: 5,
BatchCount: 10,
QueueDepth: 100,
Interval: 1000 * time.Millisecond,
},
TikvGC: TikvGC{
Interval: 20 * time.Minute,
LeaderLifeTime: 30 * time.Minute,
SafePointLifeTime: 10 * time.Minute,
Concurrency: 2,
},
},
}
}
38 changes: 38 additions & 0 deletions conf/titan.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,44 @@ pd-addrs = ""
#meta-slot = 0


[tikv.gc]

#type: time.Duration
#description: gc work tick interval
#default: 1s
#interval = "1s"

#type: time.Duration
#description: lease flush leader interval
#default: 3m
#leader-life-time = "3m0s"

#type: int
#rules: numeric
#description: key count limitation per-transection
#default: 256
#batch-limit = 256


[tikv.expire]

#type: time.Duration
#description: expire work tick interval
#default: 1s
#interval = "1s"

#type: time.Duration
#description: lease flush leader interval
#default: 3m
#leader-life-time = "3m0s"

#type: int
#rules: numeric
#description: key count limitation per-transection
#default: 256
#batch-limit = 256


[tikv.zt]

#type: int
Expand Down
4 changes: 2 additions & 2 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func Open(conf *conf.Tikv) (*RedisStore, error) {
}
rds := &RedisStore{Storage: s, conf: conf}
sysdb := rds.DB(sysNamespace, sysDatabaseID)
go StartGC(sysdb)
go StartExpire(sysdb)
go StartGC(sysdb, &conf.GC)
go StartExpire(sysdb, &conf.Expire)
go StartZT(sysdb, &conf.ZT)
go StartTikvGC(sysdb, &conf.TikvGC)
return rds, nil
Expand Down
6 changes: 3 additions & 3 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
conf := &conf.Tikv{}
mockConf := conf.MockConf()
mockDB = &DB{
Namespace: "mockdb-ns",
ID: 1,
kv: &RedisStore{Storage: store, conf: conf},
conf: &conf.DB,
kv: &RedisStore{Storage: store, conf: &mockConf.Tikv},
conf: &mockConf.Tikv.DB,
}

os.Exit(m.Run())
Expand Down
25 changes: 10 additions & 15 deletions db/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@ import (
"context"
"time"

"github.com/meitu/titan/conf"
"github.com/meitu/titan/db/store"
"github.com/meitu/titan/metrics"
"go.uber.org/zap"
)

const (
expireBatchLimit = 256
expireTick = time.Duration(time.Second)
)

var (
expireKeyPrefix = []byte("$sys:0:at:")
sysExpireLeader = []byte("$sys:0:EXL:EXLeader")
sysExpireLeaderFlushInterval = 10 * time.Second
expireKeyPrefix = []byte("$sys:0:at:")
sysExpireLeader = []byte("$sys:0:EXL:EXLeader")

// $sys:0:at:{ts}:{metaKey}
expireTimestampOffset = len(expireKeyPrefix)
Expand Down Expand Up @@ -84,12 +79,12 @@ func unExpireAt(txn store.Transaction, mkey []byte, expireAt int64) error {
}

// StartExpire get leader from db
func StartExpire(db *DB) error {
ticker := time.NewTicker(expireTick)
func StartExpire(db *DB, conf *conf.Expire) error {
ticker := time.NewTicker(conf.Interval)
defer ticker.Stop()
id := UUID()
for range ticker.C {
isLeader, err := isLeader(db, sysExpireLeader, id, sysExpireLeaderFlushInterval)
isLeader, err := isLeader(db, sysExpireLeader, id, conf.LeaderLifeTime)
if err != nil {
zap.L().Error("[Expire] check expire leader failed", zap.Error(err))
continue
Expand All @@ -98,7 +93,7 @@ func StartExpire(db *DB) error {
zap.L().Debug("[Expire] not expire leader")
continue
}
runExpire(db)
runExpire(db, conf.BatchLimit)
}
return nil
}
Expand All @@ -121,7 +116,7 @@ func toTikvDataKey(namespace []byte, id DBID, key []byte) []byte {
return b
}

func runExpire(db *DB) {
func runExpire(db *DB, batchLimit int) {
txn, err := db.Begin()
if err != nil {
zap.L().Error("[Expire] txn begin failed", zap.Error(err))
Expand All @@ -133,7 +128,7 @@ func runExpire(db *DB) {
txn.Rollback()
return
}
limit := expireBatchLimit
limit := batchLimit
now := time.Now().UnixNano()
for iter.Valid() && iter.Key().HasPrefix(expireKeyPrefix) && limit > 0 {
key := iter.Key()
Expand Down Expand Up @@ -190,5 +185,5 @@ func runExpire(db *DB) {
txn.Rollback()
zap.L().Error("[Expire] commit failed", zap.Error(err))
}
metrics.GetMetrics().ExpireKeysTotal.WithLabelValues("expired").Add(float64(expireBatchLimit - limit))
metrics.GetMetrics().ExpireKeysTotal.WithLabelValues("expired").Add(float64(batchLimit - limit))
}
28 changes: 12 additions & 16 deletions db/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ import (
"context"
"time"

"github.com/meitu/titan/conf"
"github.com/meitu/titan/db/store"
"github.com/meitu/titan/metrics"
"go.uber.org/zap"
)

var (
sysGCLeader = []byte("$sys:0:GCL:GCLeader")
gcInterval = time.Duration(1) //TODO 使用配置
)

const (
sysGCBurst = 256
sysGCLeaseFlushInterval = 10 * time.Second
)

func toTikvGCKey(key []byte) []byte {
Expand Down Expand Up @@ -57,8 +52,8 @@ func gcGetPrefix(txn store.Transaction) ([]byte, error) {
return key[len(gcPrefix):], nil
}

func gcDeleteRange(txn store.Transaction, prefix []byte, limit int64) (int64, error) {
count := int64(0)
func gcDeleteRange(txn store.Transaction, prefix []byte, limit int) (int, error) {
var count int
itr, err := txn.Iter(prefix, nil)
if err != nil {
return count, err
Expand Down Expand Up @@ -91,7 +86,7 @@ func gcComplete(txn store.Transaction, prefix []byte) error {
return txn.Delete(toTikvGCKey(prefix))
}

func doGC(db *DB, limit int64) error {
func doGC(db *DB, limit int) error {
left := limit
for left > 0 {
txn, err := db.Begin()
Expand All @@ -108,8 +103,8 @@ func doGC(db *DB, limit int64) error {
zap.L().Debug("[GC] no gc item")
return nil
}
count := int64(0)
zap.L().Debug("[GC] start to delete prefix", zap.String("prefix", string(prefix)), zap.Int64("limit", limit))
count := 0
zap.L().Debug("[GC] start to delete prefix", zap.String("prefix", string(prefix)), zap.Int("limit", limit))
if count, err = gcDeleteRange(txn.t, prefix, limit); err != nil {
return err
}
Expand Down Expand Up @@ -140,11 +135,12 @@ func doGC(db *DB, limit int64) error {
// StartGC start gc
//1.获取leader许可
//2.leader 执行清理任务
func StartGC(db *DB) {
ticker := time.Tick(gcInterval * time.Second)
func StartGC(db *DB, conf *conf.GC) {
ticker := time.NewTicker(conf.Interval)
defer ticker.Stop()
id := UUID()
for range ticker {
isLeader, err := isLeader(db, sysGCLeader, id, sysGCLeaseFlushInterval)
for range ticker.C {
isLeader, err := isLeader(db, sysGCLeader, id, conf.LeaderLifeTime)
if err != nil {
zap.L().Error("[GC] check GC leader failed", zap.Error(err))
continue
Expand All @@ -153,7 +149,7 @@ func StartGC(db *DB) {
zap.L().Debug("[GC] not GC leader")
continue
}
if err := doGC(db, sysGCBurst); err != nil {
if err := doGC(db, conf.BatchLimit); err != nil {
zap.L().Error("[GC] do GC failed", zap.Error(err))
continue
}
Expand Down
3 changes: 2 additions & 1 deletion db/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func MockDB() *DB {
if err != nil {
panic(err)
}
redis := &RedisStore{Storage: store, conf: &conf.Tikv{}}
mockConf := conf.MockConf()
redis := &RedisStore{Storage: store, conf: &mockConf.Tikv}
return &DB{Namespace: "ns", ID: DBID(1), kv: redis}
}
5 changes: 3 additions & 2 deletions db/tikvgc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ const (

// StartTikvGC start tikv gcwork
func StartTikvGC(db *DB, tikvCfg *conf.TikvGC) {
ticker := time.Tick(tikvCfg.Interval)
ticker := time.NewTicker(tikvCfg.Interval)
defer ticker.Stop()
uuid := UUID()
ctx := context.Background()
for range ticker {
for range ticker.C {
isLeader, err := isLeader(db, sysTikvGCLeader, uuid, tikvCfg.LeaderLifeTime)
if err != nil {
zap.L().Error("[TikvGC] check TikvGC leader failed", zap.Error(err))
Expand Down

0 comments on commit 40bca27

Please sign in to comment.